All files code.txt•133 kB
=================
All files code: This text file consist of codes from all files from this project - easy to put to ur generative AI.
=================
dbs.yaml (root folder):
```
#----------------------------------------------------------------------------
# Cloud Database (With no direct values - Values from ENV VARS)
#----------------------------------------------------------------------------
databases:
# Railway MySQL
- alias: ${MYSQL_ALIAS:mysql}
dialect: mysql
host: ${MYSQL_HOST}
port: ${MYSQL_PORT:12498}
user: ${MYSQL_USER}
password: ${MYSQL_PASSWORD}
database: ${MYSQL_DB}
# Railway PostgreSQL
- alias: ${PG_ALIAS:pg}
dialect: pg
host: ${PG_HOST}
port: ${PG_PORT:55537}
user: ${PG_USER}
password: ${PG_PASSWORD}
database: ${PG_DB}
# Azure SQL Server
- alias: ${MSSQL_ALIAS:mssql}
dialect: mssql
host: ${MSSQL_HOST}
port: ${MSSQL_PORT:1433}
user: ${MSSQL_USER}
password: ${MSSQL_PASSWORD}
database: ${MSSQL_DB}
# Cloud Oracle - Docker to the Azure Container
- alias: ${ORACLE_ALIAS:oracle}
dialect: oracle
connectString: ${ORACLE_CONNECT_STRING}
user: ${ORACLE_USER}
password: ${ORACLE_PASSWORD}
```
.env (root folder):
```
#----------------------------------------------------------------------------
# DIFFERENT DATABASE CONFIGURATION EXAMPLES
#----------------------------------------------------------------------------
# 1) SQLite
# DB_PROVIDER=sqlite
# SQLITE_PATH=./sample.db
# 2) PostgreSQL
# DB_PROVIDER=postgres
# DATABASE_URL=postgres://postgres:postgres@127.0.0.1:5432/appdb #postgres://USER:PASSWORD@HOST:5432/DBNAME
# 3) MySQL / MariaDB
# DB_PROVIDER=mysql
# DATABASE_URL=mysql://root:rootpass@127.0.0.1:3306/appdb #mysql://USER:PASSWORD@HOST:3306/DBNAME
# 4) SQL Server / Azure SQL
# DB_PROVIDER=mssql
# DATABASE_URL=Server=localhost,1433;Database=appdb;User Id=sa;Password=Passw0rd!;Encrypt=true;TrustServerCertificate=true
# Example (keep commented):
# Server=HOST,1433;Database=DB;User Id=USER;Password=PASS;Encrypt=true
# 5) Oracle
# DB_PROVIDER=oracle
# DATABASE_URL=USER/PASSWORD@HOST:1521/DBNAME
PROJECT_ENDPOINT=
AZURE_AI_KEY=
MODEL_DEPLOYMENT_NAME=
MCP_SERVER_URL=
```
package.json:
```
{
"name": "mcp-server-starter",
"version": "1.0.0",
"type": "module",
"scripts": {
"seed": "tsx src/db/seed.ts",
"dev:server": "tsx src/server/stdio.ts",
"dev:client": "tsx src/client/devClient.ts",
"build": "tsc",
"start": "node dist/server/http.js",
"postinstall": "npm run build",
"db:ping": "tsx dev/ping.ts",
"db:up": "docker-compose up -d",
"db:down": "docker-compose down",
"dev:http": "tsx src/server/http.ts"
},
"engines": {
"node": ">=20 <21"
},
"dependencies": {
"@modelcontextprotocol/sdk": "^1.17.5",
"better-sqlite3": "^12.2.0",
"dotenv": "^17.2.1",
"js-yaml": "^4.1.0",
"mssql": "^11.0.1",
"mysql2": "^3.14.4",
"oracledb": "^6.9.0",
"pg": "^8.16.3",
"sqlite3": "^5.1.7",
"zod": "^3.25.76"
},
"devDependencies": {
"@types/better-sqlite3": "^7.6.13",
"@types/express": "^5.0.3",
"@types/js-yaml": "^4.0.9",
"@types/mssql": "^9.1.7",
"@types/node": "^24.3.1",
"@types/oracledb": "^6.9.1",
"@types/pg": "^8.15.5",
"and": "^0.0.3",
"express": "^5.1.0",
"i": "^0.3.7",
"npm": "^11.6.0",
"tsx": "^4.20.5",
"typescript": "^5.9.2"
}
}
```
client/devClient.ts
```
import "dotenv/config";
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";
import {
ListToolsResultSchema,
CallToolResultSchema,
} from "@modelcontextprotocol/sdk/types.js";
async function main() {
// Spawn the server via tsx for dev convenience
const transport = new StdioClientTransport({
command: process.platform === "win32" ? "npx.cmd" : "npx",
args: ["tsx", "src/server/stdio.ts"],
});
const client = new Client({ name: "dev-client", version: "0.1.0" });
await client.connect(transport);
// === tools/list ===
const toolsResp = await client.request(
{ method: "tools/list", params: {} },
ListToolsResultSchema
);
const toolNames = toolsResp.tools.map(t => t.name);
console.log("Available tools:", toolNames);
// --- NEW: auto-detect alias (optionally honoring DEV_DB_ALIAS)
const preferred = process.env.DEV_DB_ALIAS?.trim();
const alias = pickAlias(toolNames, preferred);
if (!alias) {
throw new Error(
"No namespaced SQL tools found (expected '<alias>.sql.schema'). " +
"Check your dbs.yaml and environment."
);
}
console.log("Using DB alias:", alias, preferred ? `(preferred=${preferred})` : "");
// === tools/call: <alias>.sql.schema ===
const schemaRes = await client.request(
{
method: "tools/call",
params: {
name: `${alias}.sql.schema`,
arguments: {},
},
},
CallToolResultSchema
);
console.log(
"\n=== sql.schema ===\n",
schemaRes.content?.[0]?.type === "text" ? schemaRes.content[0].text : schemaRes
);
// === tools/call: <alias>.sql.peek ===
const peekRes = await client.request(
{
method: "tools/call",
params: {
name: `${alias}.sql.peek`,
arguments: {
maxRowsPerTable: 50, // adjust if needed
as: "json",
},
},
},
CallToolResultSchema
);
console.log(
"\n=== sql.peek ===\n",
peekRes.content?.[0]?.type === "text" ? peekRes.content[0].text : JSON.stringify(peekRes, null, 2)
);
// === tools/call: <alias>.sql.query ===
const sample = detectSampleQuery();
const queryRes = await client.request(
{
method: "tools/call",
params: {
name: `${alias}.sql.query`,
arguments: {
sql: sample.text,
params: sample.params,
readOnly: true,
rowLimit: 10,
as: "json",
},
},
},
CallToolResultSchema
);
console.log(
"\n=== sql.query ===\n",
queryRes.content?.[0]?.type === "text" ? queryRes.content[0].text : queryRes
);
await client.close();
}
// --- NEW: helper to pick a valid alias from tools/list (with optional preferred)
function pickAlias(names: string[], preferred?: string | null): string | null {
const aliases = Array.from(new Set(names.map(n => n.split(".")[0])));
const hasSchema = (a: string) => names.includes(`${a}.sql.schema`);
if (preferred && aliases.includes(preferred) && hasSchema(preferred)) {
return preferred;
}
const first = aliases.find(hasSchema) ?? null;
if (!first) {
console.warn("No alias exposes '.sql.schema'. Found aliases:", aliases);
}
return first;
}
function detectSampleQuery() {
const provider = (process.env.DB_PROVIDER ?? "sqlite").toLowerCase();
if (provider.includes("oracle")) return { text: "SELECT 1 AS one FROM dual", params: {} };
return { text: "SELECT 1 AS one", params: {} };
}
main().catch((err) => {
console.error("[dev-client] error:", err);
process.exit(1);
});
```
client/agent.py:
```
# agent_mcp_cmd.py
"""
CMD-based demo: Azure AI Foundry Agent <-> MCP Server (HTTP)
- Prompts for username/password and verifies against your Cloud DB (MySQL/Postgres).
- On successful login, binds X-Role and X-User-Id to MCP HTTP session.
- Creates an Azure Agent with function tools that bridge to MCP tools.
- Lets the user chat; agent calls MCP tools to query your DBs (per RBAC/policies).
- Typing 'q' or 'quit' deletes the agent and closes the MCP session cleanly.
Requirements:
pip install python-dotenv requests azure-identity azure-ai-agents
# plus one of:
pip install mysql-connector-python # if using MySQL for login
pip install psycopg2-binary # if using Postgres for login
"""
import os
import sys
import json
import time
import getpass
from typing import Any, Dict, Optional, Tuple, Callable, Set
import requests
from dotenv import load_dotenv
from azure.identity import DefaultAzureCredential
from azure.ai.agents import AgentsClient
from azure.ai.agents.models import (
FunctionTool,
RequiredFunctionToolCall,
ToolOutput,
SubmitToolOutputsAction,
ListSortOrder,
)
# ---------- Load env ----------
load_dotenv()
PROJECT_ENDPOINT = os.environ["PROJECT_ENDPOINT"]
MODEL_DEPLOYMENT_NAME = os.environ["MODEL_DEPLOYMENT_NAME"]
MCP_SERVER_URL = os.environ["MCP_SERVER_URL"].rstrip("/")
if not PROJECT_ENDPOINT or not MODEL_DEPLOYMENT_NAME or not MCP_SERVER_URL:
print("❌ Missing env: PROJECT_ENDPOINT, MODEL_DEPLOYMENT_NAME, MCP_SERVER_URL")
sys.exit(1)
# Reduce noisy logs unless debugging
os.environ.setdefault("AZURE_LOG_LEVEL", "warning")
# ---------- DB Login (MySQL/Postgres) ----------
def db_login_loop() -> Tuple[str, str, str]:
"""
Repeatedly prompt the user for username/password and verify against your DB.
Returns:
(role, user_id, username) for the authenticated user.
"""
# Config for auth table/columns (override by env if needed)
table = os.environ.get("AUTH_TABLE", "users")
col_user = os.environ.get("AUTH_USER_COL", "username")
col_pass = os.environ.get("AUTH_PASS_COL", "password")
col_role = os.environ.get("AUTH_ROLE_COL", "role")
col_userid = os.environ.get("AUTH_ID_COL", "user_id")
# Dialect from env hints (prefer explicit)
dialect = (os.environ.get("DB_PROVIDER") or os.environ.get("DB_DIALECT") or "").lower()
if not dialect:
# Derive from presence of connection envs
dialect = "mysql" if os.environ.get("MYSQL_HOST") else ("pg" if os.environ.get("PG_HOST") else "")
if dialect not in ("mysql", "pg"):
print("❌ No DB_PROVIDER (mysql|pg) set and no MYSQL_HOST/PG_HOST present.")
print(" Please configure your login DB connection envs.")
sys.exit(1)
print(f"[login] Using {dialect.upper()} for credential verification.")
while True:
username = input("Login username: ").strip()
# Use getpass for password masking in CMD
password = getpass.getpass("Login password: ").strip()
try:
if dialect == "mysql":
import mysql.connector # mysql-connector-python
conn = mysql.connector.connect(
host=os.environ["MYSQL_HOST"],
port=int(os.environ.get("MYSQL_PORT", "3306")),
user=os.environ["MYSQL_USER"],
password=os.environ["MYSQL_PASSWORD"],
database=os.environ["MYSQL_DB"],
)
sql = f"""
SELECT {col_role}, {col_userid}
FROM {table}
WHERE {col_user} = %s AND {col_pass} = %s
LIMIT 1
"""
with conn.cursor() as cur:
cur.execute(sql, (username, password))
row = cur.fetchone()
conn.close()
else: # Postgres
import psycopg2 # psycopg2-binary
conn = psycopg2.connect(
host=os.environ["PG_HOST"],
port=int(os.environ.get("PG_PORT", "5432")),
user=os.environ["PG_USER"],
password=os.environ["PG_PASSWORD"],
dbname=os.environ["PG_DB"],
)
sql = f"""
SELECT {col_role}, {col_userid}
FROM {table}
WHERE {col_user} = %s AND {col_pass} = %s
LIMIT 1
"""
with conn.cursor() as cur:
cur.execute(sql, (username, password))
row = cur.fetchone()
conn.close()
if not row:
print("⚠️ Invalid credentials. Please try again.\n")
continue
role, user_id = str(row[0]), str(row[1])
print(f"[login] Authenticated. role={role} user_id={user_id}")
return role, user_id, username
except Exception as ex:
print(f"❌ DB error: {ex}")
print(" Please verify your DB env and try again.\n")
time.sleep(0.8)
# ---------- Minimal MCP HTTP client ----------
class McpHttpClient:
"""
Speaks JSON-RPC over your MCP HTTP endpoint (/mcp).
- POST initialize → receives mcp-session-id in response headers.
- Subsequent requests carry mcp-session-id + X-Role + X-User-Id.
- Tools are invoked by 'tools/call' with {name, arguments}.
Server-side behavior referenced from your http.ts and tools/sql code. # MCP server returns mcp-session-id in headers; expects X-Role/X-User-Id. [1](https://enfrasysconsulting-my.sharepoint.com/personal/muhammad_idzhans_enfrasys_com/Documents/Microsoft%20Copilot%20Chat%20Files/All%20files%20code.txt)
"""
def __init__(self, url: str):
self.url = url.rstrip("/")
self.sid: Optional[str] = None
self.headers: Dict[str, str] = {
"Content-Type": "application/json",
# Streamable HTTP supports JSON or SSE responses; accept both:
"Accept": "application/json, text/event-stream",
}
def set_identity(self, role: str, user_id: str):
self.headers["x-role"] = role # RBAC / alias allowlist from policies.yaml [1](https://enfrasysconsulting-my.sharepoint.com/personal/muhammad_idzhans_enfrasys_com/Documents/Microsoft%20Copilot%20Chat%20Files/All%20files%20code.txt)
self.headers["x-user-id"] = user_id # row filters use :user_id injected via userContext [1](https://enfrasysconsulting-my.sharepoint.com/personal/muhammad_idzhans_enfrasys_com/Documents/Microsoft%20Copilot%20Chat%20Files/All%20files%20code.txt)
def _post(self, payload: Dict[str, Any]) -> requests.Response:
return requests.post(self.url, headers=self.headers, data=json.dumps(payload), timeout=60)
@staticmethod
def _parse_mcp_response(text: str) -> Dict[str, Any]:
"""
Supports either plain JSON or SSE ('event: message\\ndata: {...}\\n\\n').
"""
t = text.strip()
if t.startswith("event:"):
lines = t.splitlines()
data_lines = [ln for ln in lines if ln.startswith("data:")]
if not data_lines:
raise ValueError(f"No 'data:' block in SSE: {t[:200]}...")
payload = data_lines[-1][len("data: "):]
return json.loads(payload)
return json.loads(t)
def initialize(self):
payload = {
"jsonrpc": "2.0",
"id": "1",
"method": "initialize",
"params": {
"protocolVersion": "2025-03-26",
"clientInfo": {"name": "agents-bridge-client", "version": "1.0.0"},
"capabilities": {"roots": {"listChanged": True}, "sampling": {}, "tools": {}}
}
}
r = self._post(payload)
r.raise_for_status()
sid = r.headers.get("mcp-session-id")
if not sid:
raise RuntimeError("MCP server did not return mcp-session-id header.")
self.sid = sid
def ready(self):
assert self.sid, "Call initialize() first"
self.headers["mcp-session-id"] = self.sid
payload = {"jsonrpc": "2.0", "method": "notifications/initialized"}
# server does not require body parsing here
self._post(payload)
def tools_call(self, name: str, arguments: Optional[Dict[str, Any]] = None) -> str:
"""
Execute a tool and return a single text result for easier display.
Your server returns content=[{type:'text'|'json'}]; we stringify to text.
Tool names include discovery (db.aliases/types/names/listByType) and
namespaced SQL tools: "<alias>.sql.schema|peek|query". [1](https://enfrasysconsulting-my.sharepoint.com/personal/muhammad_idzhans_enfrasys_com/Documents/Microsoft%20Copilot%20Chat%20Files/All%20files%20code.txt)
"""
assert self.sid, "Call initialize() first"
args = arguments if arguments is not None else {}
payload = {
"jsonrpc": "2.0",
"id": "call-1",
"method": "tools/call",
"params": {"name": name, "arguments": args}
}
r = self._post(payload)
r.raise_for_status()
obj = self._parse_mcp_response(r.text)
result = obj.get("result") or {}
content = result.get("content") or []
if not content:
return "[]"
item = content[0]
ctype = item.get("type")
if ctype == "text":
return item.get("text", "")
if ctype == "json":
try:
return json.dumps(item.get("json"), ensure_ascii=False)
except Exception:
return str(item.get("json"))
return json.dumps(obj, ensure_ascii=False)
def close_session(self):
"""
Cleanly delete the server-side MCP session.
"""
if not self.sid:
return
try:
requests.delete(self.url, headers=self.headers, timeout=30)
except Exception:
pass
self.sid = None
# ---------- Function tools (Agent -> MCP bridge) ----------
def build_function_tools(mcp: McpHttpClient) -> FunctionTool:
"""
Expose a small set of functions the Agent can call.
These map to your MCP tools and keep the agent general-purpose.
"""
def db_aliases() -> str:
return mcp.tools_call("db.aliases", {})
def db_types() -> str:
return mcp.tools_call("db.types", {})
def db_names() -> str:
return mcp.tools_call("db.names", {})
def db_list_by_type(type: str, unique: bool = True, includeAliases: bool = False) -> str:
args = {"type": type, "unique": unique, "includeAliases": includeAliases}
return mcp.tools_call("db.listByType", args)
def sql_schema(alias: str) -> str:
return mcp.tools_call(f"{alias}.sql.schema", {})
def sql_peek(alias: str, maxRowsPerTable: int = 50, as_: str = "markdown") -> str:
args = {"maxRowsPerTable": maxRowsPerTable, "as": as_}
return mcp.tools_call(f"{alias}.sql.peek", args)
def sql_query(alias: str, sql: str, params: Optional[dict] = None,
readOnly: bool = True, rowLimit: int = 1000, as_: str = "json") -> str:
args = {"sql": sql, "params": params or {}, "readOnly": readOnly, "rowLimit": rowLimit, "as": as_}
return mcp.tools_call(f"{alias}.sql.query", args)
USER_FUNCTIONS: Set[Callable[..., Any]] = {
db_aliases, db_types, db_names, db_list_by_type, sql_schema, sql_peek, sql_query
}
return FunctionTool(functions=USER_FUNCTIONS)
# ---------- Azure Agent run helpers ----------
TERMINAL_STATES = {"completed", "failed", "expired", "cancelled"}
def normalize_status(run) -> str:
s = getattr(run, "status", None)
if s is None:
return ""
for attr in ("value", "name"):
if hasattr(s, attr):
try:
return str(getattr(s, attr)).lower()
except Exception:
pass
return str(s).lower()
def poll_until_terminal(client: AgentsClient, thread_id: str, run_id: str, interval: float = 1.0):
last_status = None
while True:
run = client.runs.get(thread_id=thread_id, run_id=run_id)
status = normalize_status(run)
if status != last_status:
print(f"[debug] run status -> {status}")
last_status = status
if status in TERMINAL_STATES:
return run
# Tool bridge
if "requires_action" in status and isinstance(getattr(run, "required_action", None), SubmitToolOutputsAction):
tool_calls = run.required_action.submit_tool_outputs.tool_calls
outputs = []
for tc in tool_calls:
print(f"[debug] tool_call: name={getattr(tc,'name','?')} args={getattr(tc,'arguments',{})}")
if isinstance(tc, RequiredFunctionToolCall):
try:
# Execute locally defined FunctionTool
out = FUNCTIONS.execute(tc)
except Exception as ex:
out = f"ERROR executing '{getattr(tc,'name','?')}': {ex}"
outputs.append(ToolOutput(tool_call_id=tc.id, output=out))
if outputs:
client.runs.submit_tool_outputs(thread_id=thread_id, run_id=run_id, tool_outputs=outputs)
time.sleep(interval)
# ---------- Main ----------
def main():
# 1) Login and bind identity to MCP
role, user_id, username = db_login_loop()
mcp = McpHttpClient(url=MCP_SERVER_URL)
mcp.set_identity(role=role, user_id=user_id) # identity headers required by your server [1](https://enfrasysconsulting-my.sharepoint.com/personal/muhammad_idzhans_enfrasys_com/Documents/Microsoft%20Copilot%20Chat%20Files/All%20files%20code.txt)
mcp.initialize() # POST initialize → mcp-session-id header returned by your server [1](https://enfrasysconsulting-my.sharepoint.com/personal/muhammad_idzhans_enfrasys_com/Documents/Microsoft%20Copilot%20Chat%20Files/All%20files%20code.txt)
mcp.ready()
global FUNCTIONS
FUNCTIONS = build_function_tools(mcp)
# 2) Azure Agents client
agents_client = AgentsClient(
endpoint=PROJECT_ENDPOINT,
credential=DefaultAzureCredential(
exclude_environment_credential=True,
exclude_managed_identity_credential=True,
),
)
# 3) Create agent with instructions tailored to your RBAC + row filters
# Ensures "my" resolves to the logged-in user without follow-up questions.
default_alias_hint = {
"customer": "customer_db",
"customer_admin": "customer_db",
"merchant": "merchant_db",
"merchant_admin": "merchant_db",
}.get(role, None)
# instructions = f"""
# You are assisting a signed-in user.
# - username: {username}
# - role: {role}
# - user_id: {user_id}
# Behavior:
# - Treat "my ..." as referring to user_id={user_id}.
# - Do NOT ask who the user is; you already know.
# - Use SQL tools via the provided functions (sql_schema / sql_peek / sql_query).
# - Prefer named parameters (e.g., :user_id) and small result sets.
# - If role is not 'admin', avoid discovery tools unless needed; rely on default alias.
# - Default alias: {default_alias_hint or "(none)"} (use this unless the user explicitly chooses another allowed alias).
# - Examples the user may ask:
# "What is my current total amount of points?"
# → Call sql_schema (once if needed), then sql_query on the default alias with a SELECT that aggregates from the relevant table(s),
# using :user_id and LIMIT/TOP/ROWNUM as appropriate for the dialect.
# Important:
# - Your access is scoped by the server using X-Role and X-User-Id headers.
# - If a query is rejected, adjust to allowed tables or apply row filters (e.g., WHERE user_id = :user_id).
# """.strip()
instructions = f"""
You are assisting a signed-in user.
- username: {username}
- role: {role}
- user_id: {user_id}
Identity & pronouns
- Treat any phrase like “my points”, “my purchases”, “my account” as referring to user_id={user_id}.
- Do NOT ask the user who they are; you already know from the session headers.
Alias selection
- Default to alias **customer_db** for any question about the user’s account, points, or purchase history.
- Use alias **merchant_db** ONLY when the user wants to browse or ask about items/products (catalog browsing).
Allowed tables (customer role)
- In **customer_db**: you may query ONLY these tables: `users`, `purchase_history`, `points_history`.
- In **merchant_db**: you may query ONLY the `items` table.
- If you attempt a table outside these lists, adjust your plan to an allowed table and try again.
Tool usage
- Prefer `<alias>.sql.query` for answers. Call `<alias>.sql.schema` once if you need to confirm the exact column names.
- Do NOT use `sql.peek` for customer questions.
- Discovery tools (db.aliases/types/names) are unnecessary; you already know which aliases to use for this role.
SQL rules
- Use **read-only SELECT** statements with **named parameters** (e.g., `:user_id`, `:limit`).
- Keep results small. Always include a limit (LIMIT / TOP / ROWNUM depending on dialect).
- For personal data in **customer_db**, ALWAYS include a `WHERE user_id = :user_id` filter.
- For **merchant_db.items**, no user filter is required unless specified (e.g., `WHERE is_active = 1`).
Examples
- “What is my current total amount of points?”
→ alias=customer_db; query points from `points_history` with `WHERE user_id = :user_id`.
Example (generic): SELECT SUM(points) AS total_points FROM points_history WHERE user_id = :user_id;
- “Show my last 5 purchases.”
→ alias=customer_db; query `purchase_history` filtered by user and ordered by recency.
Example: SELECT purchase_id, item_id, total_price, purchase_date
FROM purchase_history
WHERE user_id = :user_id
ORDER BY purchase_date DESC
LIMIT :limit; (set :limit = 5)
- “List available items.”
→ alias=merchant_db; query `items` and return a concise list with name/price/availability.
Example: SELECT item_id, name, price, availability_status
FROM items
WHERE is_active = 1
ORDER BY name ASC
LIMIT :limit; (e.g., :limit = 10)
Error handling
- If a call fails with a policy/permission error, switch to the allowed alias/table and add required filters (e.g., `user_id = :user_id`), then retry.
Response style
- Return concise answers with the computed values (e.g., the total points number) and a short summary. Avoid exposing raw SQL unless the user asks for it.
""".strip()
with agents_client:
agent = agents_client.create_agent(
model=MODEL_DEPLOYMENT_NAME,
name="mcp-sql-agent",
instructions=instructions,
tools=FUNCTIONS.definitions,
)
print(f"Agent created: {agent.id}")
thread = agents_client.threads.create()
print(f"Thread created: {thread.id}")
try:
while True:
prompt = input("\nAsk something (or 'quit'/'q'): ").strip()
if prompt.lower() in ("quit", "q", "exit"):
break
agents_client.messages.create(thread_id=thread.id, role="user", content=prompt)
run = agents_client.runs.create(thread_id=thread.id, agent_id=agent.id)
run = poll_until_terminal(agents_client, thread.id, run.id)
# Show conversation as simple alternating blocks
try:
msgs = agents_client.messages.list(thread_id=thread.id, order=ListSortOrder.ASCENDING)
print("\nConversation:")
print("=" * 80)
for m in msgs:
if m.text_messages:
for tm in m.text_messages:
print(f"{m.role.upper()}: {tm.text.value}\n")
print("=" * 80)
except Exception as e:
print("⚠️ Could not list messages:", e)
finally:
# Cleanup: delete agent and close MCP session
try:
agents_client.delete_agent(agent.id)
print(f"Deleted agent: {agent.id}")
except Exception:
pass
try:
mcp.close_session()
print("Closed MCP session.")
except Exception:
pass
if __name__ == "__main__":
main()
```
client/client.py:
```
# client.py
import os
import json
import time
import requests
from typing import Any, Dict, Optional, Set, Callable, Tuple
from dotenv import load_dotenv
from azure.identity import DefaultAzureCredential
from azure.ai.agents import AgentsClient
from azure.ai.agents.models import (
FunctionTool,
SubmitToolOutputsAction,
ToolOutput,
RequiredFunctionToolCall,
ListSortOrder,
)
# ========== Load env ==========
load_dotenv()
PROJECT_ENDPOINT = os.environ["PROJECT_ENDPOINT"]
MODEL_DEPLOYMENT_NAME = os.environ["MODEL_DEPLOYMENT_NAME"]
MCP_SERVER_URL = os.environ["MCP_SERVER_URL"].rstrip("/")
# Verbose logs (optional)
os.environ.setdefault("AZURE_LOG_LEVEL", "warning")
# ========== Railway DB login (role + user_id) ==========
def railway_login() -> Tuple[str, str]:
"""
Returns (role, user_id) for the current user by querying your Railway DB.
You can set credentials for MySQL or Postgres via env vars.
Defaults:
- table: users
- columns: username, password, role, user_id
Prompts at runtime for username/password.
"""
# Prompt user
username = input("Login username: ").strip()
password = input("Login password: ").strip()
# Config (override via env if your schema differs)
table = os.environ.get("AUTH_TABLE", "users")
col_user = os.environ.get("AUTH_USER_COL", "username")
col_pass = os.environ.get("AUTH_PASS_COL", "password")
col_role = os.environ.get("AUTH_ROLE_COL", "role")
col_userid = os.environ.get("AUTH_ID_COL", "user_id")
# Determine DB type from env (mysql | pg)
dialect = (os.environ.get("DB_PROVIDER") or os.environ.get("DB_DIALECT") or "").lower()
if not dialect:
# fallback: auto if MYSQL_HOST present -> mysql, elif PG_HOST -> pg
dialect = "mysql" if os.environ.get("MYSQL_HOST") else ("pg" if os.environ.get("PG_HOST") else "")
if dialect not in ("mysql", "pg"):
print("[login] No DB_PROVIDER set (mysql|pg). Using default role='admin', user_id='test_user'.")
return ("admin", "test_user")
try:
if dialect == "mysql":
import mysql.connector # mysql-connector-python
conn = mysql.connector.connect(
host=os.environ["MYSQL_HOST"],
port=int(os.environ.get("MYSQL_PORT", "3306")),
user=os.environ["MYSQL_USER"],
password=os.environ["MYSQL_PASSWORD"],
database=os.environ["MYSQL_DB"],
)
# NOTE: In production use hashed passwords; this demo assumes plain text
sql = f"""
SELECT {col_role}, {col_userid}
FROM {table}
WHERE {col_user} = %s AND {col_pass} = %s
LIMIT 1
"""
with conn.cursor() as cur:
cur.execute(sql, (username, password))
row = cur.fetchone()
conn.close()
else: # pg
import psycopg2 # psycopg2-binary
conn = psycopg2.connect(
host=os.environ["PG_HOST"],
port=int(os.environ.get("PG_PORT", "5432")),
user=os.environ["PG_USER"],
password=os.environ["PG_PASSWORD"],
dbname=os.environ["PG_DB"],
)
sql = f"""
SELECT {col_role}, {col_userid}
FROM {table}
WHERE {col_user} = %s AND {col_pass} = %s
LIMIT 1
"""
with conn.cursor() as cur:
cur.execute(sql, (username, password))
row = cur.fetchone()
conn.close()
if not row:
print("[login] Invalid credentials. Defaulting to role='customer' user_id='1' for demo.")
return ("customer", "1")
role, user_id = str(row[0]), str(row[1])
print(f"[login] Authenticated. role={role} user_id={user_id}")
return (role, user_id)
except Exception as ex:
print(f"[login] DB error ({dialect}), defaulting to admin/test_user: {ex}")
return ("admin", "test_user")
# ========== Minimal MCP HTTP client (same flow as your toolList.py) ==========
class McpHttpClient:
def __init__(self, url: str):
self.url = url.rstrip("/")
self.sid: Optional[str] = None
self.headers: Dict[str, str] = {
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
# x-role / x-user-id set after login
}
def update_identity(self, role: str, user_id: str):
"""Update identity headers; call before initialize()"""
self.headers["x-role"] = role
self.headers["x-user-id"] = user_id
def _post(self, payload: Dict[str, Any]) -> requests.Response:
return requests.post(self.url, headers=self.headers, data=json.dumps(payload), timeout=60)
@staticmethod
def _parse_response(text: str) -> Dict[str, Any]:
t = text.strip()
if t.startswith("event:"):
lines = t.splitlines()
data_lines = [ln for ln in lines if ln.startswith("data:")]
if not data_lines:
raise ValueError(f"No 'data:' block in SSE: {t[:200]}...")
payload = data_lines[-1][len("data: "):]
return json.loads(payload)
return json.loads(t)
def initialize(self):
payload = {
"jsonrpc": "2.0",
"id": "1",
"method": "initialize",
"params": {
"protocolVersion": "2025-03-26",
"clientInfo": {"name": "agents-bridge-client", "version": "1.0.0"},
"capabilities": {"roots": {"listChanged": True}, "sampling": {}, "tools": {}}
}
}
r = self._post(payload)
r.raise_for_status()
sid = r.headers.get("mcp-session-id")
if not sid:
raise RuntimeError("MCP server did not return mcp-session-id in headers.")
self.sid = sid
def ready(self):
assert self.sid, "Call initialize() first"
self.headers["mcp-session-id"] = self.sid
payload = {"jsonrpc": "2.0", "method": "notifications/initialized"}
self._post(payload) # ignore body
def tools_call(self, name: str, arguments: Optional[Dict[str, Any]] = None) -> str:
"""
Call an MCP tool and return a text payload suitable for Agent ToolOutput.
We coerce MCP results (content=[{type:'json'|'text'}]) into a single string.
"""
assert self.sid, "Call initialize() first"
args = arguments if arguments is not None else {}
payload = {
"jsonrpc": "2.0",
"id": "call-1",
"method": "tools/call",
"params": {"name": name, "arguments": args}
}
r = self._post(payload)
r.raise_for_status()
obj = self._parse_response(r.text)
result = obj.get("result") or {}
content = result.get("content") or []
if not content:
return "[]"
item = content[0]
ctype = item.get("type")
if ctype == "text":
return item.get("text", "")
if ctype == "json":
try:
return json.dumps(item.get("json"), ensure_ascii=False)
except Exception:
return str(item.get("json"))
return json.dumps(obj, ensure_ascii=False)
_mcp = McpHttpClient(MCP_SERVER_URL)
_mcp_initialized = False
def _ensure_mcp_session():
global _mcp_initialized
if not _mcp_initialized:
_mcp.initialize()
_mcp.ready()
_mcp_initialized = True
# ========== Function tools (generalized) ==========
def db_aliases() -> str:
"""
Return list of available database aliases as a JSON string.
:return: JSON string array of aliases.
"""
_ensure_mcp_session()
return _mcp.tools_call("db.aliases", {})
def db_types() -> str:
"""
Return list of available database dialects as a JSON string.
:return: JSON string array (e.g., ["mysql","pg","mssql","oracle","sqlite"])
"""
_ensure_mcp_session()
return _mcp.tools_call("db.types", {})
def db_names() -> str:
"""
Return list of database names (not aliases) as a JSON string.
:return: JSON string array of names.
"""
_ensure_mcp_session()
return _mcp.tools_call("db.names", {})
def db_list_by_type(type: str, unique: bool = True, includeAliases: bool = False) -> str:
"""
List databases for a given dialect.
:param type: One of mysql | pg | mssql | oracle | sqlite
:param unique: If true, unique names; else one row per alias.
:param includeAliases: If true, include alias along with name.
:return: JSON string array (names or objects with alias+name).
"""
_ensure_mcp_session()
args = {"type": type, "unique": unique, "includeAliases": includeAliases}
return _mcp.tools_call("db.listByType", args)
def sql_schema(alias: str) -> str:
"""
Return a compact Markdown outline of tables and columns for the given alias.
:param alias: Database alias (e.g., "customer_db", "merchant_db")
:return: Markdown string.
"""
_ensure_mcp_session()
return _mcp.tools_call(f"{alias}.sql.schema", {})
def sql_peek(alias: str, maxRowsPerTable: int = 50, as_: str = "markdown") -> str:
"""
Peek into content for the given alias.
:param alias: Database alias
:param maxRowsPerTable: 1..10000
:param as_: "markdown" | "json"
:return: Markdown or JSON text (stringified).
"""
_ensure_mcp_session()
args = {"maxRowsPerTable": maxRowsPerTable, "as": as_}
return _mcp.tools_call(f"{alias}.sql.peek", args)
def sql_query(alias: str, sql: str, params: Optional[dict] = None,
readOnly: bool = True, rowLimit: int = 1000, as_: str = "json") -> str:
"""
Execute a parameterized SQL query against the given alias.
:param alias: Database alias
:param sql: SELECT query string
:param params: Named parameters dict
:param readOnly: Only SELECT when true
:param rowLimit: Max rows returned
:param as_: "json" | "markdown"
:return: JSON or Markdown text (stringified)
"""
_ensure_mcp_session()
args = {"sql": sql, "params": params or {}, "readOnly": readOnly, "rowLimit": rowLimit, "as": as_}
return _mcp.tools_call(f"{alias}.sql.query", args)
# ========== Build FunctionTool set ==========
USER_FUNCTIONS: Set[Callable[..., Any]] = {
db_aliases,
db_types,
db_names,
db_list_by_type,
sql_schema,
sql_peek,
sql_query,
}
FUNCTIONS = FunctionTool(functions=USER_FUNCTIONS) # Agent can call these tools
# ========== Run helpers ==========
TERMINAL = {"completed", "failed", "expired", "cancelled"}
def normalize_status(run) -> str:
s = getattr(run, "status", None)
if s is None:
return ""
for attr in ("value", "name"):
if hasattr(s, attr):
try:
return str(getattr(s, attr)).lower()
except Exception:
pass
return str(s).lower()
def poll_until_terminal(client: AgentsClient, thread_id: str, run_id: str, interval: float = 1.0):
last_status = None
while True:
run = client.runs.get(thread_id=thread_id, run_id=run_id)
status = normalize_status(run)
if status != last_status:
print(f"[debug] run status -> {status}")
last_status = status
if status in TERMINAL:
return run
if "requires_action" in status and isinstance(getattr(run, "required_action", None), SubmitToolOutputsAction):
tool_calls = run.required_action.submit_tool_outputs.tool_calls
outputs = []
for tc in tool_calls:
print(f"[debug] tool_call: name={getattr(tc,'name','?')} args={getattr(tc,'arguments',{})}")
if isinstance(tc, RequiredFunctionToolCall):
try:
out = FUNCTIONS.execute(tc) # bridges to MCP HTTP
except Exception as ex:
out = f"ERROR executing function '{getattr(tc,'name','?')}': {ex}"
outputs.append(ToolOutput(tool_call_id=tc.id, output=out))
if outputs:
client.runs.submit_tool_outputs(thread_id=thread_id, run_id=run_id, tool_outputs=outputs)
time.sleep(interval)
# ========== Main ==========
def main():
# 1) Login (Railway DB) -> get role + user_id, bind to MCP headers
role, user_id = railway_login()
_mcp.update_identity(role, user_id) # must be before initialize
_ensure_mcp_session() # session created using this identity
# 2) Azure Agents client
agents_client = AgentsClient(
endpoint=PROJECT_ENDPOINT,
credential=DefaultAzureCredential(
exclude_environment_credential=True,
exclude_managed_identity_credential=True,
),
)
# 3) Create agent with generalized function tools
with agents_client:
# agent = agents_client.create_agent(
# model=MODEL_DEPLOYMENT_NAME,
# name="sql-mcp-bridge-agent",
# instructions=(
# "You can use the provided tools to answer questions.\n"
# "- Use db_aliases/db_types/db_names/db_list_by_type to discover databases.\n"
# "- When inspecting or querying a specific database, call sql_schema/peek/query and "
# "pass the alias argument (e.g., alias='customer_db').\n"
# "- If a tool returns JSON text, summarize as needed."
# ),
# tools=FUNCTIONS.definitions,
# )
context_instructions = f"""
You are assisting a signed-in user.
- role: {role}
- user_id: {user_id}
Rules:
- When the user says "my", treat it as user_id={user_id}.
- Do NOT ask the user which user they are; you already know.
- If role is "customer", default to alias "customer_db" unless the user explicitly selects another allowed alias.
- If role is "merchant", default to alias "merchant_db".
- Only call discovery tools (db_aliases, db_types, db_names, db_list_by_type) if role is "admin".
- Prefer SELECT statements with named parameters. Keep results small.
"""
agent = agents_client.create_agent(
model=MODEL_DEPLOYMENT_NAME,
name="sql-mcp-agent",
instructions=context_instructions.strip(),
tools=FUNCTIONS.definitions,
)
print(f"Agent created: {agent.id}")
thread = agents_client.threads.create()
print(f"Thread created: {thread.id}")
while True:
prompt = input("\nAsk something (or 'quit'): ").strip()
if prompt.lower() in ("quit", "q", "exit"):
break
agents_client.messages.create(thread_id=thread.id, role="user", content=prompt)
run = agents_client.runs.create(thread_id=thread.id, agent_id=agent.id)
run = poll_until_terminal(agents_client, thread.id, run.id)
print(f"Run status: {normalize_status(run)}")
# Show conversation
try:
count = 0
msgs = agents_client.messages.list(thread_id=thread.id, order=ListSortOrder.ASCENDING)
print("\nConversation:")
print("=" * 80)
for m in msgs:
if m.text_messages:
for tm in m.text_messages:
if count == 0:
print(f"\n{m.role.upper()}: {tm.text.value}")
count = 1
elif count == 1:
print(f"{m.role.upper()}: {tm.text.value}\n")
count = 0
print("=" * 80)
except Exception as e:
print("⚠️ Could not list messages:", e)
# Optional cleanup
try:
agents_client.delete_agent(agent.id)
except Exception:
pass
if __name__ == "__main__":
main()
```
client/toolList.py
```
# client.py
import os
import json
import time
import requests
from typing import Any, Dict, Optional, Set, Callable, Tuple
from dotenv import load_dotenv
from azure.identity import DefaultAzureCredential
from azure.ai.agents import AgentsClient
from azure.ai.agents.models import (
FunctionTool,
SubmitToolOutputsAction,
ToolOutput,
RequiredFunctionToolCall,
ListSortOrder,
)
# ========== Load env ==========
load_dotenv()
PROJECT_ENDPOINT = os.environ["PROJECT_ENDPOINT"]
MODEL_DEPLOYMENT_NAME = os.environ["MODEL_DEPLOYMENT_NAME"]
MCP_SERVER_URL = os.environ["MCP_SERVER_URL"].rstrip("/")
# Verbose logs (optional)
os.environ.setdefault("AZURE_LOG_LEVEL", "warning")
# ========== Railway DB login (role + user_id) ==========
def railway_login() -> Tuple[str, str]:
"""
Returns (role, user_id) for the current user by querying your Railway DB.
You can set credentials for MySQL or Postgres via env vars.
Defaults:
- table: users
- columns: username, password, role, user_id
Prompts at runtime for username/password.
"""
# Prompt user
username = input("Login username: ").strip()
password = input("Login password: ").strip()
# Config (override via env if your schema differs)
table = os.environ.get("AUTH_TABLE", "users")
col_user = os.environ.get("AUTH_USER_COL", "username")
col_pass = os.environ.get("AUTH_PASS_COL", "password")
col_role = os.environ.get("AUTH_ROLE_COL", "role")
col_userid = os.environ.get("AUTH_ID_COL", "user_id")
# Determine DB type from env (mysql \n pg)
dialect = (os.environ.get("DB_PROVIDER") or os.environ.get("DB_DIALECT") or "").lower()
if not dialect:
# fallback: auto if MYSQL_HOST present -> mysql, elif PG_HOST -> pg
dialect = "mysql" if os.environ.get("MYSQL_HOST") else ("pg" if os.environ.get("PG_HOST") else "")
if dialect not in ("mysql", "pg"):
print("[login] No DB_PROVIDER set (mysql\\pg). Using default role='admin', user_id='test_user'.")
return ("admin", "test_user")
try:
if dialect == "mysql":
import mysql.connector # mysql-connector-python
conn = mysql.connector.connect(
host=os.environ["MYSQL_HOST"],
port=int(os.environ.get("MYSQL_PORT", "3306")),
user=os.environ["MYSQL_USER"],
password=os.environ["MYSQL_PASSWORD"],
database=os.environ["MYSQL_DB"],
)
# NOTE: In production use hashed passwords; this demo assumes plain text
sql = f"""
SELECT {col_role}, {col_userid}
FROM {table}
WHERE {col_user} = %s AND {col_pass} = %s
LIMIT 1
"""
with conn.cursor() as cur:
cur.execute(sql, (username, password))
row = cur.fetchone()
conn.close()
else: # pg
import psycopg2 # psycopg2-binary
conn = psycopg2.connect(
host=os.environ["PG_HOST"],
port=int(os.environ.get("PG_PORT", "5432")),
user=os.environ["PG_USER"],
password=os.environ["PG_PASSWORD"],
dbname=os.environ["PG_DB"],
)
sql = f"""
SELECT {col_role}, {col_userid}
FROM {table}
WHERE {col_user} = %s AND {col_pass} = %s
LIMIT 1
"""
with conn.cursor() as cur:
cur.execute(sql, (username, password))
row = cur.fetchone()
conn.close()
if not row:
print("[login] Invalid credentials. Defaulting to role='customer' user_id='1' for demo.")
return ("customer", "1")
role, user_id = str(row[0]), str(row[1])
print(f"[login] Authenticated. role={role} user_id={user_id}")
return (role, user_id)
except Exception as ex:
print(f"[login] DB error ({dialect}), defaulting to admin/test_user: {ex}")
return ("admin", "test_user")
# ========== Minimal MCP HTTP client (same flow as your toolList.py) ==========
class McpHttpClient:
def __init__(self, url: str):
self.url = url.rstrip("/")
self.sid: Optional[str] = None
self.headers: Dict[str, str] = {
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
# x-role / x-user-id set after login
}
def update_identity(self, role: str, user_id: str):
"""Update identity headers; call before initialize()"""
self.headers["x-role"] = role
self.headers["x-user-id"] = user_id
def _post(self, payload: Dict[str, Any]) -> requests.Response:
return requests.post(self.url, headers=self.headers, data=json.dumps(payload), timeout=60)
@staticmethod
def _parse_response(text: str) -> Dict[str, Any]:
t = text.strip()
if t.startswith("event:"):
lines = t.splitlines()
data_lines = [ln for ln in lines if ln.startswith("data:")]
if not data_lines:
raise ValueError(f"No 'data:' block in SSE: {t[:200]}...")
payload = data_lines[-1][len("data: "):]
return json.loads(payload)
return json.loads(t)
def initialize(self):
payload = {
"jsonrpc": "2.0",
"id": "1",
"method": "initialize",
"params": {
"protocolVersion": "2025-03-26",
"clientInfo": {"name": "agents-bridge-client", "version": "1.0.0"},
"capabilities": {"roots": {"listChanged": True}, "sampling": {}, "tools": {}}
}
}
r = self._post(payload)
r.raise_for_status()
sid = r.headers.get("mcp-session-id")
if not sid:
raise RuntimeError("MCP server did not return mcp-session-id in headers.")
self.sid = sid
def ready(self):
assert self.sid, "Call initialize() first"
self.headers["mcp-session-id"] = self.sid
payload = {"jsonrpc": "2.0", "method": "notifications/initialized"}
self._post(payload) # ignore body
def tools_call(self, name: str, arguments: Optional[Dict[str, Any]] = None) -> str:
"""
Call an MCP tool and return a text payload suitable for Agent ToolOutput.
We coerce MCP results (content=[{type:'json'|'text'}]) into a single string.
"""
assert self.sid, "Call initialize() first"
args = arguments if arguments is not None else {}
payload = {
"jsonrpc": "2.0",
"id": "call-1",
"method": "tools/call",
"params": {"name": name, "arguments": args}
}
r = self._post(payload)
r.raise_for_status()
obj = self._parse_response(r.text)
result = obj.get("result") or {}
content = result.get("content") or []
if not content:
return "[]"
item = content[0]
ctype = item.get("type")
if ctype == "text":
return item.get("text", "")
if ctype == "json":
try:
return json.dumps(item.get("json"), ensure_ascii=False)
except Exception:
return str(item.get("json"))
return json.dumps(obj, ensure_ascii=False)
_mcp = McpHttpClient(MCP_SERVER_URL)
_mcp_initialized = False
def _ensure_mcp_session():
global _mcp_initialized
if not _mcp_initialized:
_mcp.initialize()
_mcp.ready()
_mcp_initialized = True
# ========== Function tools (generalized) ==========
def db_aliases() -> str:
"""Return list of available database aliases as a JSON string."""
_ensure_mcp_session()
return _mcp.tools_call("db.aliases", {})
def db_types() -> str:
"""Return list of available database dialects as a JSON string."""
_ensure_mcp_session()
return _mcp.tools_call("db.types", {})
def db_names() -> str:
"""Return list of database names (not aliases) as a JSON string."""
_ensure_mcp_session()
return _mcp.tools_call("db.names", {})
def db_list_by_type(type: str, unique: bool = True, includeAliases: bool = False) -> str:
"""List databases for a given dialect."""
_ensure_mcp_session()
args = {"type": type, "unique": unique, "includeAliases": includeAliases}
return _mcp.tools_call("db.listByType", args)
def sql_schema(alias: str) -> str:
"""Return a compact Markdown outline of tables and columns for the given alias."""
_ensure_mcp_session()
return _mcp.tools_call(f"{alias}.sql.schema", {})
def sql_peek(alias: str, maxRowsPerTable: int = 50, as_: str = "markdown") -> str:
"""Peek into content for the given alias."""
_ensure_mcp_session()
args = {"maxRowsPerTable": maxRowsPerTable, "as": as_}
return _mcp.tools_call(f"{alias}.sql.peek", args)
def sql_query(alias: str, sql: str, params: Optional[dict] = None,
readOnly: bool = True, rowLimit: int = 1000, as_: str = "json") -> str:
"""Execute a parameterized SQL query against the given alias."""
_ensure_mcp_session()
args = {"sql": sql, "params": params or {}, "readOnly": readOnly, "rowLimit": rowLimit, "as": as_}
return _mcp.tools_call(f"{alias}.sql.query", args)
# ========== Build FunctionTool set ==========
USER_FUNCTIONS: Set[Callable[..., Any]] = {
db_aliases,
db_types,
db_names,
db_list_by_type,
sql_schema,
sql_peek,
sql_query,
}
FUNCTIONS = FunctionTool(functions=USER_FUNCTIONS) # Agent can call these tools
# ========== Run helpers ==========
TERMINAL = {"completed", "failed", "expired", "cancelled"}
def normalize_status(run) -> str:
s = getattr(run, "status", None)
if s is None:
return ""
for attr in ("value", "name"):
if hasattr(s, attr):
try:
return str(getattr(s, attr)).lower()
except Exception:
pass
return str(s).lower()
def poll_until_terminal(client: AgentsClient, thread_id: str, run_id: str, interval: float = 1.0):
last_status = None
while True:
run = client.runs.get(thread_id=thread_id, run_id=run_id)
status = normalize_status(run)
if status != last_status:
print(f"[debug] run status -> {status}")
last_status = status
if status in TERMINAL:
return run
if "requires_action" in status and isinstance(getattr(run, "required_action", None), SubmitToolOutputsAction):
tool_calls = run.required_action.submit_tool_outputs.tool_calls
outputs = []
for tc in tool_calls:
print(f"[debug] tool_call: name={getattr(tc,'name','?')} args={getattr(tc,'arguments',{})}")
if isinstance(tc, RequiredFunctionToolCall):
try:
out = FUNCTIONS.execute(tc) # bridges to MCP HTTP
except Exception as ex:
out = f"ERROR executing function '{getattr(tc,'name','?')}': {ex}"
outputs.append(ToolOutput(tool_call_id=tc.id, output=out))
if outputs:
client.runs.submit_tool_outputs(thread_id=thread_id, run_id=run_id, tool_outputs=outputs)
time.sleep(interval)
# ========== Main ==========
def main():
# 1) Login (Railway DB) -> get role + user_id, bind to MCP headers
role, user_id = railway_login()
_mcp.update_identity(role, user_id) # must be before initialize
_ensure_mcp_session() # session created using this identity
# 2) Discover aliases and pick a default alias for this session (tiny addition)
try:
aliases = json.loads(db_aliases())
except Exception:
aliases = []
# Prefer role-specific alias if present; else first available alias
default_alias = None
role_l = (role or "").lower()
if role_l.startswith("customer") and "customer_db" in aliases:
default_alias = "customer_db"
elif role_l.startswith("merchant") and "merchant_db" in aliases:
default_alias = "merchant_db"
elif aliases:
default_alias = aliases[0]
# 3) Get a compact schema preview for the default alias and inject into instructions
schema_preview = ""
if default_alias:
try:
schema_preview = sql_schema(default_alias)
# keep the preview short to avoid flooding context (adjust as needed)
schema_preview = schema_preview[:4000]
except Exception:
schema_preview = ""
# 4) Identity-aware instructions (small change from your original)
agent_instructions = (
"You can use the provided tools to answer questions.\n"
f"- Signed-in identity: role={role}, user_id={user_id}.\n"
f"- Default database alias to use when not specified: {default_alias}.\n"
"- Do NOT ask for credentials; the user is already authenticated.\n"
"- When the user says \"my ...\", interpret it with this identity (user_id above).\n"
"- Use db_aliases/db_types/db_names/db_list_by_type to discover databases if needed.\n"
"- When inspecting or querying a specific database, call sql_schema/peek/query and "
"pass the alias argument (use the default alias unless the user specifies another).\n"
"- If a tool returns JSON text, summarize as needed.\n"
"\n"
"### Schema overview (default alias)\n"
f"{schema_preview}\n"
)
# 5) Azure Agents client
agents_client = AgentsClient(
endpoint=PROJECT_ENDPOINT,
credential=DefaultAzureCredential(
exclude_environment_credential=True,
exclude_managed_identity_credential=True,
),
)
# 6) Create agent with generalized function tools + identity-aware instructions
with agents_client:
agent = agents_client.create_agent(
model=MODEL_DEPLOYMENT_NAME,
name="sql-mcp-bridge-agent",
instructions=agent_instructions,
tools=FUNCTIONS.definitions,
)
print(f"Agent created: {agent.id}")
thread = agents_client.threads.create()
print(f"Thread created: {thread.id}")
while True:
prompt = input("\nAsk something (or 'quit'): ").strip()
if prompt.lower() in ("quit", "q", "exit"):
break
agents_client.messages.create(thread_id=thread.id, role="user", content=prompt)
run = agents_client.runs.create(thread_id=thread.id, agent_id=agent.id)
run = poll_until_terminal(agents_client, thread.id, run.id)
print(f"Run status: {normalize_status(run)}")
# Show conversation
try:
msgs = agents_client.messages.list(thread_id=thread.id, order=ListSortOrder.ASCENDING)
print("\nConversation:")
print("-" * 60)
for m in msgs:
if m.text_messages:
for tm in m.text_messages:
print(f"{m.role.upper()}: {tm.text.value}")
print("-" * 60)
except Exception as e:
print("⚠️ Could not list messages:", e)
# Optional cleanup
try:
agents_client.delete_agent(agent.id)
except Exception:
pass
if __name__ == "__main__":
main()
```
src/db/index.ts
```
/* eslint-disable @typescript-eslint/no-explicit-any */
/**
* src/db/index.ts
*
* Minimal, env-driven DB provider selector.
* - Resolves from DB_PROVIDER or DB_DIALECT (or DATABASE_URL scheme), defaulting to sqlite.
* - Normalizes synonyms (postgres/postgresql -> pg, mariadb -> mysql, sqlserver -> mssql, sqlite3 -> sqlite).
* - Prefers factory exports (newDb/createDb/default()) else falls back to singletons (pgDb/mysqlDb/...).
*/
import type { DB } from "./provider.js";
type CanonicalDialect = "pg" | "mysql" | "mssql" | "oracle";
const DIALECT_SYNONYMS: Record<string, CanonicalDialect> = {
// Postgres
pg: "pg",
postgres: "pg",
postgresql: "pg",
psql: "pg",
// MySQL
mysql: "mysql",
mariadb: "mysql",
maria: "mysql",
// SQL Server
mssql: "mssql",
"ms-sql": "mssql",
sqlserver: "mssql",
"sql-server": "mssql",
// Oracle
oracle: "oracle",
oracledb: "oracle",
oci: "oracle",
};
function canonicalizeDialect(input?: string | null): CanonicalDialect | undefined {
if (!input) return undefined;
const key = String(input).trim().toLowerCase();
return DIALECT_SYNONYMS[key];
}
function dialectFromDatabaseUrl(url?: string): CanonicalDialect | undefined {
if (!url) return undefined;
try {
const u = new URL(url);
const proto = u.protocol.replace(":", "").toLowerCase();
// Map only to supported dialects via synonyms (pg/mysql/mssql/oracle)
return DIALECT_SYNONYMS[proto];
} catch {
// Non-URL strings (JDBC-ish, etc.) - we no longer guess "sqlite"; return undefined
return undefined;
}
}
function resolveDialectFromEnv(env = process.env): CanonicalDialect {
const fromProvider = canonicalizeDialect(env.DB_PROVIDER);
if (fromProvider) return fromProvider;
const fromDialect = canonicalizeDialect(env.DB_DIALECT);
if (fromDialect) return fromDialect;
const fromUrl = dialectFromDatabaseUrl(env.DATABASE_URL);
if (fromUrl) return fromUrl;
throw new Error(
"Unable to resolve DB dialect from env. " +
"Please set DB_PROVIDER/DB_DIALECT/DATABASE_URL for a supported dialect (pg/mysql/mssql/oracle)."
)
}
/** Attach canonical dialect hint on the db object. */
function annotateDialect<T extends object>(db: T, dialect: CanonicalDialect): T & { dialect: CanonicalDialect } {
if (!db) return { dialect } as any;
if ((db as any).dialect !== dialect) {
try {
Object.defineProperty(db as any, "dialect", { value: dialect, enumerable: true });
} catch {
(db as any).dialect = dialect;
}
}
return db as any;
}
/** Prefer factory if available, else fall back to well-known singleton names. */
function materializeDb(mod: any, dialect: CanonicalDialect): DB {
// 1) Factories (preferred)
if (typeof mod?.newDb === "function") {
const db = mod.newDb();
return annotateDialect(db, dialect);
}
if (typeof mod?.createDb === "function") {
const db = mod.createDb();
return annotateDialect(db, dialect);
}
if (typeof mod?.default === "function") {
const db = mod.default();
return annotateDialect(db, dialect);
}
// 2) default export already a db object?
if (mod?.default && typeof mod.default === "object" && typeof mod.default.query === "function") {
return annotateDialect(mod.default, dialect);
}
// 3) Well-known singleton names (your current exports)
const knownSingletons: Record<CanonicalDialect, string[]> = {
pg: ["pgDb", "db"],
mysql: ["mysqlDb", "db"],
mssql: ["mssqlDb", "db"],
oracle: ["oracleDb", "db"],
};
for (const key of knownSingletons[dialect]) {
const val = mod?.[key];
if (val && typeof val.query === "function") {
return annotateDialect(val, dialect);
}
}
// 4) Heuristic: any object with query()
for (const key of Object.keys(mod ?? {})) {
const val = mod[key];
if (val && typeof val === "object" && typeof val.query === "function") {
return annotateDialect(val, dialect);
}
}
throw new Error(
`Provider module for '${dialect}' does not expose a usable DB export. ` +
`Expected a factory (newDb/createDb/default()) or a singleton (e.g., ${dialect}Db). ` +
`Exports: [${Object.keys(mod ?? {}).join(", ")}]`
);
}
/** Load the provider module for a given canonical dialect. */
async function loadModule(dialect: CanonicalDialect): Promise<any> {
switch (dialect) {
case "pg":
return import("./providers/postgres.js");
case "mysql":
return import("./providers/mysql.js");
case "mssql":
return import("./providers/mssql.js");
case "oracle":
return import("./providers/oracle.js");
default:
// This should be unreachable due to the CanonicalDialect union,
// but we keep a defensive guard for future edits.
throw new Error(`Unsupported dialect: ${dialect}`);
}
}
/**
* Public API: get a DB instance based on current env.
* - Imports provider AFTER env resolution.
* - Uses factory if present; otherwise singleton.
*/
export async function getDb(): Promise<DB> {
const dialect = resolveDialectFromEnv(process.env);
const mod = await loadModule(dialect);
const db = materializeDb(mod, dialect);
return db as DB;
}
/** Optional helper (e.g., for X-DB-Dialect header). */
export function getResolvedDialect(): CanonicalDialect {
return resolveDialectFromEnv(process.env);
}
```
src/db/provider.ts
```
export type Dialect = "sqlite" | "pg" | "mysql" | "mssql" | "oracle";
export interface DB {
dialect: Dialect;
/**
* Execute a parameterized query.
* @param text SQL with placeholders appropriate for the driver
* @param params Parameter values (array or named object based on driver)
*/
query<T = unknown>(text: string, params: any): Promise<{ rows: T[]; rowCount: number }>;
close?(): Promise<void> | void;
}
```
src/db/registry.ts
```
// src/db/registry.ts
import fs from "node:fs";
import * as yaml from "js-yaml";
import { getDb } from "./index.js";
import type { DB, Dialect } from "./provider.js";
/**
* NOTE: This version adds:
* - ${ENV} and ${ENV:default} expansion for all string fields in dbs.yaml
* - "enabled: false" support to skip entries explicitly
* - Graceful skip of entries whose required envs are missing/blank
* - Light type coercion (e.g., port -> number)
*/
export type DbEntry =
| ({
alias: string;
enabled?: boolean;
dialect: "mssql";
host: string; port?: number; user: string; password: string; database: string;
options?: Record<string, any>;
})
| ({
alias: string;
enabled?: boolean;
dialect: "mysql";
host: string; port?: number; user: string; password: string; database: string;
})
| ({
alias: string;
enabled?: boolean;
dialect: "pg";
host: string; port?: number; user: string; password: string; database: string;
})
| ({
alias: string;
enabled?: boolean;
dialect: "oracle";
connectString: string; user: string; password: string;
})
| ({
alias: string;
enabled?: boolean;
dialect: "sqlite";
file: string;
});
export interface DbConfigFile {
databases: DbEntry[];
}
export interface DbAliasMeta {
alias: string;
dialect: Dialect; // "mysql" | "pg" | "mssql" | "oracle" | "sqlite"
databaseName: string; // what you want to show on /dbs
host?: string;
port?: number;
connectString?: string;
file?: string;
}
/** ------------------------------------------------------------------ */
/** ENV EXPANSION HELPERS: ${NAME} or ${NAME:default} in YAML strings. */
/** ------------------------------------------------------------------ */
function expandEnvInString(str: string): string {
// Replace ${VAR} or ${VAR:default}
return str.replace(/\$\{([A-Z0-9_]+)(?::([^}]*))?\}/gi, (_m, name: string, def?: string) => {
const v = process.env[name];
if (v === undefined || v === "") {
// If no value and default provided -> use default; otherwise keep empty (so we can "skip" later).
return def ?? "";
}
return v;
});
}
function deepExpand<T>(obj: T): T {
if (obj == null) return obj;
if (typeof obj === "string") return expandEnvInString(obj) as unknown as T;
if (Array.isArray(obj)) return obj.map(deepExpand) as unknown as T;
if (typeof obj === "object") {
const out: any = {};
for (const [k, v] of Object.entries(obj as any)) out[k] = deepExpand(v);
return out;
}
return obj;
}
/** Coerce common field types (e.g., port string -> number). */
function coerceTypesInPlace(entry: any) {
if (entry?.port != null && typeof entry.port === "string") {
const n = Number(entry.port);
if (Number.isFinite(n)) entry.port = n;
}
return entry;
}
function isNonEmptyString(x: unknown): x is string {
return typeof x === "string" && x.trim().length > 0;
}
/** Figure out missing required keys per dialect for a given entry. */
function getMissingKeys(entry: any): string[] {
switch (entry?.dialect) {
case "mssql": {
const req = ["alias", "dialect", "host", "user", "password", "database"];
return req.filter((k) => !isNonEmptyString(entry[k]));
}
case "mysql": {
const req = ["alias", "dialect", "host", "user", "password", "database"];
return req.filter((k) => !isNonEmptyString(entry[k]));
}
case "pg": {
const req = ["alias", "dialect", "host", "user", "password", "database"];
return req.filter((k) => !isNonEmptyString(entry[k]));
}
case "oracle": {
const req = ["alias", "dialect", "connectString", "user", "password"];
return req.filter((k) => !isNonEmptyString(entry[k]));
}
case "sqlite": {
const req = ["alias", "dialect", "file"];
return req.filter((k) => !isNonEmptyString(entry[k]));
}
default:
return ["dialect"];
}
}
/** ---------------------------------------------------------- */
/** Your existing helpers: clear DB env, patch, scoped getDb(). */
/** ---------------------------------------------------------- */
/** Hard-clear DB-related env before each alias to prevent bleed. */
function clearDbEnv(env = process.env) {
const explicit = [
"DB_PROVIDER",
"DB_DIALECT",
"DATABASE_URL",
"SQLITE_FILE",
"SQLITE_PATH",
];
const patterns = [
/^PG/i,
/^POSTGRES/i,
/^MYSQL/i,
/^MSSQL/i,
/^SQLSERVER/i,
/^ORACLE/i,
/^ORACLE_DB/i,
/^ORACLEDB/i,
/^OCI/i,
/^SQLITE/i,
];
for (const k of explicit) delete env[k];
for (const k of Object.keys(env)) {
if (patterns.some((rx) => rx.test(k))) delete env[k];
}
}
function withEnv<T>(patch: Record<string, string>, fn: () => Promise<T>): Promise<T> {
const prev: Record<string, string | undefined> = {};
for (const [k, v] of Object.entries(patch)) {
prev[k] = process.env[k];
process.env[k] = v;
}
return fn().finally(() => {
for (const [k, v] of Object.entries(prev)) {
if (v === undefined) delete process.env[k];
else process.env[k] = v;
}
});
}
function envPatchFor(entry: DbEntry): Record<string, string> {
switch (entry.dialect) {
case "mssql": {
const host = entry.host;
const port = String(entry.port ?? 1433);
const user = entry.user;
const password = entry.password;
const database = entry.database;
// Server=host,port;Database=db;User Id=user;Password=pass;Encrypt=true;TrustServerCertificate=true;
const base =
[
`Server=${host},${port}`,
`Database=${database}`,
`User Id=${user}`,
`Password=${password}`,
`Encrypt=true`,
`TrustServerCertificate=true`, // OK for dev; for prod consider false with proper certs
].join(";") + ";";
const patch: Record<string, string> = {
DB_PROVIDER: "mssql",
DB_DIALECT: "mssql",
DATABASE_URL: base,
MSSQL_SERVER: host,
MSSQL_HOST: host,
MSSQL_PORT: port,
MSSQL_USER: user,
MSSQL_PASSWORD: password,
MSSQL_DATABASE: database,
};
if ((entry as any).options) {
patch.MSSQL_OPTS_JSON = JSON.stringify((entry as any).options);
}
return patch;
}
case "mysql": {
const host = entry.host;
const port = String(entry.port ?? 3306);
const user = encodeURIComponent(entry.user);
const password = encodeURIComponent(entry.password);
const database = entry.database;
const url = `mysql://${user}:${password}@${host}:${port}/${database}`;
return {
DB_PROVIDER: "mysql",
DB_DIALECT: "mysql",
DATABASE_URL: url,
MYSQL_HOST: host,
MYSQL_PORT: port,
MYSQL_USER: decodeURIComponent(user),
MYSQL_PASSWORD: decodeURIComponent(password),
MYSQL_DATABASE: database,
};
}
case "pg": {
const host = entry.host;
const port = String(entry.port ?? 5432);
const user = encodeURIComponent(entry.user);
const password = encodeURIComponent(entry.password);
const database = entry.database;
const url = `postgres://${user}:${password}@${host}:${port}/${database}`;
return {
DB_PROVIDER: "pg",
DB_DIALECT: "pg",
DATABASE_URL: url,
PGHOST: host,
PGPORT: port,
PGUSER: decodeURIComponent(user),
PGPASSWORD: decodeURIComponent(password),
PGDATABASE: database,
};
}
case "oracle": {
const user = entry.user;
const password = entry.password;
const connectString = entry.connectString;
const url = `${user}/${password}@${connectString}`;
return {
DB_PROVIDER: "oracle",
DB_DIALECT: "oracle",
DATABASE_URL: url,
ORACLE_CONNECT_STRING: connectString,
ORACLE_USER: user,
ORACLE_PASSWORD: password,
};
}
case "sqlite": {
return {
DB_PROVIDER: "sqlite",
DB_DIALECT: "sqlite",
SQLITE_FILE: entry.file,
SQLITE_PATH: entry.file,
};
}
}
}
export async function loadDbRegistryFromYaml(path: string): Promise<{
registry: Map<string, DB>;
meta: Map<string, DbAliasMeta>;
closeAll: () => Promise<void>;
}> {
const raw = fs.readFileSync(path, "utf8");
// 1) Parse YAML
const parsed = yaml.load(raw) as DbConfigFile;
// 2) Expand ${ENV} placeholders across all strings
const cfg = deepExpand(parsed) as DbConfigFile;
const list = cfg?.databases ?? [];
if (!list.length) throw new Error(`No databases in ${path}`);
const registry = new Map<string, DB>();
const meta = new Map<string, DbAliasMeta>();
// small helper – works on Windows and POSIX
const basename = (p?: string) =>
(p ?? "").split(/[\\/]/).filter(Boolean).pop() ?? "(sqlite)";
for (const rawEntry of list) {
if ((rawEntry as any)?.enabled === false) {
console.warn(`[db] Skipping '${(rawEntry as any).alias ?? "?"}' (enabled=false).`);
continue;
}
// Coerce obvious scalar types before expansion
const coerced = coerceTypesInPlace({ ...rawEntry }) as DbEntry;
// Expand one YAML item into N concrete entries (lists -> variants)
const variants = expandDbEntry(coerced);
for (const entry of variants) {
// Validate this concrete entry
const missing = getMissingKeys(entry as any);
if (missing.length > 0) {
console.warn(
`[db] Skipping alias='${(entry as any).alias ?? "?"}' (dialect='${(entry as any).dialect ?? "?"}'): ` +
`missing env/fields: ${missing.join(", ")}`
);
continue;
}
// ---- Compute displayable database name for this alias (per dialect) ----
let databaseName = "";
switch (entry.dialect) {
case "mysql":
case "pg":
case "mssql":
databaseName = (entry as any).database ?? "";
break;
case "oracle": {
const cs = (entry as any).connectString ?? "";
// Use everything after the final "/" as the service name, else the raw connect string
databaseName = cs.includes("/") ? cs.split("/").pop()! : cs;
break;
}
case "sqlite": {
const f = (entry as any).file ?? (entry as any).path ?? "";
databaseName = f ? basename(f) : "(sqlite)";
break;
}
default:
databaseName = (entry as any).database ?? "";
}
if (!databaseName) databaseName = "(unknown)";
// Store meta for this alias
meta.set(entry.alias, {
alias: entry.alias,
dialect: entry.dialect,
databaseName,
host: (entry as any).host,
port: (entry as any).port,
connectString: (entry as any).connectString,
file: (entry as any).file ?? (entry as any).path,
});
// Build and store DB with isolated env per alias
clearDbEnv();
const patch = envPatchFor(entry);
const db = await withEnv(patch, async () => await getDb());
if (registry.has(entry.alias)) {
console.error(`[db] Duplicate alias '${entry.alias}' – previous entry will be overwritten.`);
}
registry.set(entry.alias, db);
}
}
if (registry.size === 0) {
console.warn(`[db] No usable database entries after expansion/validation from ${path}.`);
}
async function closeAll() {
for (const db of registry.values()) {
await db.close?.();
}
}
return { registry, meta, closeAll };
}
// Utility: split a comma-separated env value into a string[], trim blanks.
function splitList(v: unknown): string[] {
if (v === undefined || v === null) return [];
return String(v)
.split(",")
.map(s => s.trim())
.filter(Boolean);
}
/**
* Expand a single raw DbEntry that may contain comma-separated fields
* (alias, host, port, user, password, database, connectString)
* into N concrete entries by zipping/broadcasting those lists.
*/
function expandDbEntry<T extends Record<string, any>>(raw: T): T[] {
const multiKeys = new Set([
"alias",
"host",
"port",
"user",
"password",
"database", // <-- make sure "database" is included
"connectString",
]);
// Collect arrays per multiKey and determine N (max length)
const arrays: Record<string, string[]> = {};
let maxLen = 1;
for (const [k, v] of Object.entries(raw)) {
if (!multiKeys.has(k)) continue;
const arr = Array.isArray(v) ? v.map(String) : splitList(v);
if (arr.length) {
arrays[k] = arr;
if (arr.length > maxLen) maxLen = arr.length;
}
}
const out: T[] = [];
for (let i = 0; i < maxLen; i++) {
const variant: any = { ...raw };
for (const key of Object.keys(arrays)) {
const list = arrays[key];
// broadcast last item if list is shorter than maxLen
variant[key] = list[Math.min(i, list.length - 1)];
}
// Ensure alias uniqueness
if (maxLen > 1) {
const providedAlias = variant.alias ?? raw.dialect ?? "db";
const aliasWasList = (arrays.alias?.length ?? 0) > 1;
variant.alias = aliasWasList
? providedAlias
: `${providedAlias}${i === 0 ? "" : `_${i + 1}`}`;
}
// Normalize numeric port if present
if (variant.port !== undefined) {
const n = Number(variant.port);
if (!Number.isNaN(n)) variant.port = n;
}
out.push(variant);
}
return out;
}
```
src/db/providers/mssql.ts:
```
// src/db/providers/mssql.ts
import mssql from 'mssql';
import type { DB } from '../provider.js';
export default function createMssqlDb(): DB {
const connectionString = process.env.DATABASE_URL!;
let pool: mssql.ConnectionPool | null = null;
let connectPromise: Promise<mssql.ConnectionPool> | null = null;
async function getPool(): Promise<mssql.ConnectionPool> {
if (pool && pool.connected) return pool;
if (!connectPromise) {
connectPromise = new mssql.ConnectionPool(connectionString)
.connect()
.then(p => { pool = p; return p; })
.catch(err => { connectPromise = null; throw err; });
}
return connectPromise;
}
return {
dialect: 'mssql',
async query(text, params?: any) {
const p = await getPool();
const req = p.request();
// Support both array and object parameters
if (params) {
if (Array.isArray(params)) {
// Accept either positional values OR {name, value} objects
let posIndex = 0;
for (const v of params) {
if (v && typeof v === 'object' && 'name' in v) {
req.input(String((v as any).name), (v as any).value as any);
} else {
req.input(`p${++posIndex}`, v as any);
}
}
} else if (typeof params === 'object') {
for (const [k, v] of Object.entries(params)) {
req.input(k, v as any);
}
}
}
const result = await req.query(text);
const rows = result.recordset ?? [];
return { rows, rowCount: Array.isArray(rows) ? rows.length : 0 };
},
async close() {
try { await pool?.close(); }
finally { pool = null; connectPromise = null; }
},
};
}
```
src/db/providers/mysql.ts:
```
import mysql from 'mysql2/promise';
import type { DB } from '../provider.js';
export default function createMysqlDb(): DB {
const url = process.env.DATABASE_URL!;
const pool = mysql.createPool(url);
return {
dialect: 'mysql',
async query(sql, params) {
const [rows] = await pool.query(sql, params);
return { rows: rows as any[], rowCount: Array.isArray(rows) ? rows.length : 0 };
},
async close() {
await pool.end();
}
};
}
```
src/db/providers/oracle.ts:
```
// src/db/providers/oracle.ts
import oracledb from 'oracledb';
import type { DB } from '../provider.js';
function parseEzConnect(url: string) {
// DATABASE_URL format expected: user/password@host:port/service
const m = url.match(/^([^/]+)\/([^@]+)@(.+)$/);
if (!m) return null;
const [, user, password, connectString] = m;
return { user, password, connectString };
}
function normalizeSql(sql: string): string {
// Make "SELECT 1" portable in Oracle
return /^\s*select\s+1\s*;?\s*$/i.test(sql)
? 'SELECT 1 AS "OK" FROM DUAL'
: sql;
}
export default function createOracleDb(): DB {
// Prefer ORACLE_* if provided; else parse DATABASE_URL (EZCONNECT)
const url = process.env.DATABASE_URL!;
const fromUrl = parseEzConnect(url) ?? {};
const user = process.env.ORACLE_USER ?? (fromUrl as any).user;
const password = process.env.ORACLE_PASSWORD ?? (fromUrl as any).password;
const connectString = process.env.ORACLE_CONNECT_STRING ?? (fromUrl as any).connectString;
if (!user || !password || !connectString) {
throw new Error('Oracle config missing: user/password/connectString');
}
let pool: oracledb.Pool | null = null;
let poolPromise: Promise<oracledb.Pool> | null = null;
async function getPool(): Promise<oracledb.Pool> {
if (pool) return pool;
if (!poolPromise) {
poolPromise = oracledb
.createPool({
user,
password,
connectString,
// You can expose pool tuning here if needed (poolMin, poolMax, stmtCacheSize, etc.)
})
.then(p => {
pool = p;
return p;
})
.catch(err => {
poolPromise = null;
throw err;
});
}
return poolPromise;
}
return {
dialect: 'oracle',
async query(text, params?: any) {
const p = await getPool();
const conn = await p.getConnection();
try {
const sql = normalizeSql(text);
const bind = params ?? {};
const res = await conn.execute(sql, bind, { outFormat: oracledb.OUT_FORMAT_OBJECT });
const rows = (res.rows as any[]) ?? [];
return { rows, rowCount: rows.length };
} finally {
await conn.close();
}
},
async close() {
try {
await pool?.close(0);
} finally {
pool = null;
poolPromise = null;
}
},
};
}
```
src/db/providers/postgres.ts
```
// src/db/providers/postgres.ts
import { Pool } from 'pg';
import type { DB } from '../provider.js';
export default function createPostgresDb(): DB {
const pool = new Pool({ connectionString: process.env.DATABASE_URL! });
return {
dialect: 'pg',
async query(text, params?: any) {
// Your param mapper should already convert :name → $1,$2 and give an array
const res = await pool.query(text, Array.isArray(params) ? params : undefined);
return { rows: res.rows, rowCount: res.rowCount ?? res.rows.length };
},
async close() {
await pool.end();
},
};
}
```
src/server/http.ts:
```
import "dotenv/config";
import express from "express";
import type { Request, Response } from "express";
import { loadDbRegistryFromYaml } from "../db/registry.js";
import type { DB } from "../db/provider.js";
import type { DbAliasMeta } from "../db/registry.js";
import { mapNamedToDriver } from "../db/paramMap.js";
import { randomUUID } from "node:crypto";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { isInitializeRequest } from "@modelcontextprotocol/sdk/types.js";
import { registerSqlTools } from "../tools/sql/index.js";
// NEW: RBAC policy
import { evaluatePolicyFromFile } from "../policy/index.js";
import { evaluateToolsPolicyFromFile } from "../policy/index.js";
const app = express();
app.use(express.json());
const PORT = Number(process.env.PORT ?? 8787);
// ——— DB registry state ———
type Row = Record<string, any>;
let registry: Map<string, DB> = new Map();
let meta: Map<string, DbAliasMeta> = new Map();
let closeAll: () => Promise<void> = async () => {};
// ——— Helper: log ———
function logReq(method: string, req: Request) {
const sid = req.header?.("mcp-session-id") ?? "(none)";
const bodyMethod = (req as any).body?.method ?? "(n/a)";
console.log(`[MCP] ${method} sid=${sid} bodyMethod=${bodyMethod}`);
}
// ——— Session with RBAC ———
type Session = {
server: McpServer;
transport: StreamableHTTPServerTransport;
createdAt: number;
lastSeenAt: number;
user: { id?: string; roles: string[] };
allowedAliases: string[];
};
const sessions = new Map<string, Session>();
const SESSION_TTL_MS = Number(process.env.MCP_SESSION_TTL_MS ?? 30 * 60 * 1000);
const EVICT_EVERY_MS = 60 * 1000;
function rolesFromReq(req: Request): string[] {
const raw = req.header("x-role") ?? "";
const roles = raw.split(",").map((s) => s.trim()).filter(Boolean);
return roles.length ? roles : ["admin"];
}
function requireSession(req: Request, res: Response): { sid: string; s?: Session } | null {
const sid = req.header("mcp-session-id") ?? "";
if (!sid) {
res.status(400).send("Invalid or missing mcp-session-id");
return null;
}
return { sid, s: sessions.get(sid) };
}
function touch(sid: string) {
const s = sessions.get(sid);
if (s) s.lastSeenAt = Date.now();
}
// Create a session restricted to allowed aliases
async function createSession(req: Request): Promise<StreamableHTTPServerTransport> {
const server = new McpServer({ name: "mcp-sql", version: "0.2.0" });
// Which aliases this user can access
const roles = rolesFromReq(req);
const allAliases = Array.from(registry.keys());
const policyPath = process.env.POLICY_FILE ?? "./policies.yaml";
const { allowedAliases } = evaluatePolicyFromFile(policyPath, { roles, allAliases });
// Per-alias tool + data policy
const policies = evaluateToolsPolicyFromFile(policyPath, { roles, aliases: allowedAliases });
// Discovery tools: admin-only when X-Role is present; open when no role header
const hasRoleHeader = !!req.header("x-role");
const isAdmin = roles.includes("admin");
// const discoveryVisible = hasRoleHeader ? isAdmin : true;
// Always expose discovery tools; their results are already filtered to the session’s allowed aliases.
const discoveryVisible = true;
// User identity (for :user_id in rowFilters)
const userId = req.header("x-user-id") ?? undefined;
// Register aliases with the policy and user context
for (const alias of allowedAliases) {
const db = registry.get(alias)!;
const p = policies[alias]; // may be undefined
const applyDataPolicy = hasRoleHeader && !isAdmin && !!p;
registerSqlTools(server, {
db,
auditPath: process.env.SQL_AUDIT_LOG,
ns: alias,
meta,
registry,
tools: p ? p.tools : undefined,
dataPolicy: applyDataPolicy
? { readOnly: p!.readOnly, tableAllow: p!.tableAllow, rowFilters: p!.rowFilters }
: undefined,
userContext: applyDataPolicy ? { user_id: userId } : undefined,
discoveryVisible,
});
}
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
onsessioninitialized: (sid: string) => {
sessions.set(sid, {
server,
transport,
createdAt: Date.now(),
lastSeenAt: Date.now(),
user: { roles },
allowedAliases,
});
console.log(`[MCP] session initialized: ${sid}, roles=${roles.join(",")}, aliases=${allowedAliases.join("|")}`);
},
});
await server.connect(transport);
return transport;
}
// ——— REST endpoints ———
app.get("/health", (_req, res) => res.status(200).send("ok"));
app.get("/dbs", (_req, res) => {
const names = Array.from(
new Set(Array.from(meta.values()).map((m) => m.databaseName).filter(Boolean))
).sort((a, b) => a.localeCompare(b));
res.json(names);
});
app.get("/dbs/types", (_req, res) => {
const types = Array.from(new Set(Array.from(meta.values()).map((m) => m.dialect))).sort();
res.json(types);
});
app.get("/dbs/aliases", (_req, res) => {
res.json(Array.from(registry.keys()).sort());
});
app.get("/dbs/list-by-type", (_req, res) => {
const grouped: Record<string, string[]> = {};
for (const info of meta.values()) {
(grouped[info.dialect] ??= []).push(info.databaseName);
}
for (const t of Object.keys(grouped)) {
grouped[t] = Array.from(new Set(grouped[t])).sort((a, b) => a.localeCompare(b));
}
res.json(grouped);
});
app.post("/sql/query", async (req, res) => {
try {
const {
db: nameOrAlias,
type,
sql,
params = {},
readOnly = true,
rowLimit = 1000,
} = req.body ?? {};
if (typeof nameOrAlias !== "string" || !nameOrAlias.trim()) {
return res.status(400).json({ error: "Body 'db' is required (alias or database name)." });
}
if (typeof sql !== "string" || !sql.trim()) {
return res.status(400).json({ error: "Body 'sql' is required." });
}
let allowedAliases: string[] = Array.from(registry.keys()); // default (dev)
const sid = req.header("mcp-session-id");
if (sid && sessions.has(sid)) {
allowedAliases = sessions.get(sid)!.allowedAliases;
} else if ((process.env.DEV_ALLOW_HEADER_ROLE ?? "1") === "1") {
const roles = rolesFromReq(req);
const policyPath = process.env.POLICY_FILE ?? "./policies.yaml";
allowedAliases = evaluatePolicyFromFile(policyPath, {
roles,
allAliases: Array.from(registry.keys()),
}).allowedAliases;
}
// Resolve alias
let alias = nameOrAlias;
let db = registry.get(alias);
if (!db) {
const dialect = typeof type === "string" && type ? String(type).trim() : undefined;
const matches = Array.from(meta.entries())
.filter(([_, m]) => m.databaseName === nameOrAlias && (!dialect || m.dialect === dialect));
if (matches.length === 0) {
return res.status(404).json({
error: `Unknown db alias or database name: '${nameOrAlias}'${dialect ? ` (type=${dialect})` : ""}`,
});
}
if (matches.length > 1) {
const hint = matches.map(([a, m]) => `${a} (${m.dialect})`).join(", ");
return res.status(400).json({
error: `Ambiguous database name '${nameOrAlias}'. Provide 'type' (mysql\npg\nmssql\noracle\nsqlite) or use alias. Candidates: ${hint}`,
});
}
[alias] = matches[0];
db = registry.get(alias)!;
}
// Enforce RBAC
if (!allowedAliases.includes(alias)) {
return res.status(403).json({ error: `Forbidden: alias '${alias}' is not allowed for this user/session.` });
}
if (readOnly && !/^\s*select\b/i.test(sql)) {
return res.status(400).json({ error: "readOnly mode: only SELECT is allowed." });
}
const { text, params: mapped } = mapNamedToDriver(sql, params, db.dialect);
const t0 = Date.now();
const { rows, rowCount } = await db.query<Row>(text, mapped);
const ms = Date.now() - t0;
const limited: Row[] = Array.isArray(rows)
? rows.length > rowLimit
? rows.slice(0, rowLimit)
: rows
: [];
res.setHeader("X-DB-Dialect", db.dialect);
res.setHeader("X-Row-Count", String(rowCount ?? limited.length ?? 0));
res.setHeader("X-Elapsed-ms", String(ms));
return res.json(limited);
} catch (err: any) {
console.error(err);
res.status(500).json({ error: String(err?.message ?? err) });
}
});
// ——— MCP per-session transport ———
app.post("/mcp", async (req, res) => {
logReq("POST", req);
const hasSid = !!req.header("mcp-session-id");
if (!hasSid && isInitializeRequest((req as any).body)) {
const transport = await createSession(req);
return transport.handleRequest(req as any, res as any, (req as any).body);
}
if (hasSid) {
const sid = req.header("mcp-session-id")!;
const sess = sessions.get(sid);
if (!sess) {
return res.status(400).json({
jsonrpc: "2.0",
error: { code: -32000, message: "Bad Request: Invalid or expired mcp-session-id" },
id: null,
});
}
touch(sid);
return sess.transport.handleRequest(req as any, res as any, (req as any).body);
}
return res.status(400).json({
jsonrpc: "2.0",
error: { code: -32000, message: "Bad Request: No valid session or initialize request" },
id: null,
});
});
app.get("/mcp", (req, res) => {
logReq("GET", req);
const r = requireSession(req, res);
if (!r) return;
const { sid, s } = r;
if (!s) return;
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
touch(sid);
return s.transport.handleRequest(req as any, res as any);
});
app.delete("/mcp", async (req, res) => {
logReq("DELETE", req);
const r = requireSession(req, res);
if (!r) return;
const { sid, s } = r;
if (!s) return;
await s.transport.handleRequest(req as any, res as any);
sessions.delete(sid);
console.log(`[MCP] session deleted: ${sid}`);
});
setInterval(() => {
if (SESSION_TTL_MS <= 0) return;
const now = Date.now();
for (const [sid, s] of sessions) {
if (now - s.lastSeenAt > SESSION_TTL_MS) {
sessions.delete(sid);
console.log(`[MCP] session evicted (idle): ${sid}`);
}
}
}, EVICT_EVERY_MS);
// ——— Boot ———
(async () => {
const cfgPath = process.env.SQL_DBS_CONFIG ?? "./dbs.yaml";
const loaded = await loadDbRegistryFromYaml(cfgPath);
registry = loaded.registry;
closeAll = loaded.closeAll;
meta = loaded.meta;
app.listen(PORT, () => {
console.log(`HTTP bridge listening on http://localhost:${PORT}`);
const types = Array.from(new Set(Array.from(meta.values()).map((m) => m.dialect))).sort();
const names = Array.from(new Set(Array.from(meta.values()).map((m) => m.databaseName))).sort();
const aliases = Array.from(registry.keys()).sort();
console.log(`Available DB types: ${types.join(", ")}`);
console.log(`Available DB names: ${names.join(", ")}`);
console.log(`Available DB aliases: ${aliases.join(", ")}`);
console.log(`[MCP] Per-session server+transport mode is ACTIVE`);
});
})();
process.on("SIGINT", async () => { await closeAll?.(); process.exit(0); });
process.on("SIGTERM", async () => { await closeAll?.(); process.exit(0); });
```
src/tools/sql/index.ts
```
import { z } from "zod";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import type { DB } from "../../db/provider.js";
import type { DbAliasMeta } from "../../db/registry.js";
import { mapNamedToDriver } from "../../db/paramMap.js";
import { sqlGuardrails } from "./templates.js";
import { excludedOracleTables } from "./unwantedOracle.js";
/* ────────────────────────────────────────────────────────────────────────────
Arg normalization + registration helper (avoid SDK pre-validation)
──────────────────────────────────────────────────────────────────────────── */
function normalizeArgsRaw(argsRaw: unknown): any {
if (typeof argsRaw === "string") {
try { return JSON.parse(argsRaw); } catch { return {}; }
}
return argsRaw && typeof argsRaw === "object" ? argsRaw : {};
}
// Always provide raw Zod shape to SDK, and compile locally for parsing.
function registerToolNoSchema<
TShape extends z.ZodRawShape | null | undefined
>(
server: McpServer,
name: string,
meta: { title?: string; description?: string },
shape: TShape,
handler: (args: any) => Promise<any>
) {
// Raw shape for SDK (publishes JSON Schema)
const rawShape: z.ZodRawShape = (shape ?? {}) as z.ZodRawShape;
// Compiled object for local parsing
const compiled = z.object(rawShape);
server.registerTool(
name,
{
title: meta.title,
description: meta.description,
// ⬅️ IMPORTANT: pass RAW SHAPE, not a ZodObject
inputSchema: rawShape,
},
async (argsRaw) => {
const raw = normalizeArgsRaw(argsRaw);
const parsed = compiled.parse(raw);
return handler(parsed);
}
);
}
/* ────────────────────────────────────────────────────────────────────────────
Per-server state
──────────────────────────────────────────────────────────────────────────── */
const serverAliases = new WeakMap<McpServer, Set<string>>();
const discoveryRegistered = new WeakSet<McpServer>();
export function registerSqlTools(
server: McpServer,
{
db,
auditPath,
ns,
meta,
registry,
tools,
dataPolicy,
userContext,
discoveryVisible,
}: {
db: DB;
auditPath?: string;
ns?: string;
meta: Map<string, DbAliasMeta>;
registry: Map<string, DB>;
tools?: { schema?: boolean; peek?: boolean; query?: boolean };
dataPolicy?: {
readOnly?: boolean;
tableAllow?: string[];
rowFilters?: Record<string, string>;
};
userContext?: { user_id?: string };
discoveryVisible?: boolean;
}
) {
const name = (base: string) => (ns ? `${ns}.${base}` : base);
// Track aliases served in this session
if (ns) {
const set = serverAliases.get(server) ?? new Set<string>();
set.add(ns);
serverAliases.set(server, set);
}
/* ────────────────────────────────────────────────────────────────────────
Discovery tools (registered once per server)
──────────────────────────────────────────────────────────────────────── */
if (!discoveryRegistered.has(server)) {
discoveryRegistered.add(server);
if (discoveryVisible !== false) {
const metaVisible = (): DbAliasMeta[] => {
const allowed = serverAliases.get(server) ?? new Set<string>();
const out: DbAliasMeta[] = [];
for (const [alias, m] of meta.entries()) if (allowed.has(alias)) out.push({ ...m });
return out;
};
// db.aliases (no args) -> JSON only
registerToolNoSchema(
server,
"db.aliases",
{
title: "List database aliases",
description: "Return the list of available database aliases visible to this session.",
},
null,
async () => {
const set = serverAliases.get(server) ?? new Set<string>();
const aliases = Array.from(set).sort();
// return { content: [{ type: "json", json: aliases }] };
return { content: [{ type: "text", text: JSON.stringify(aliases) }] };
}
);
// db.types (no args) -> JSON only
registerToolNoSchema(
server,
"db.types",
{
title: "List available database (types)",
description: "List available database dialects (types) visible in this session.",
},
null,
async () => {
const visible = metaVisible();
const types = Array.from(new Set(visible.map((m) => m.dialect))).sort();
// return { content: [{ type: "json", json: types }] };
return { content: [{ type: "text", text: JSON.stringify(types) }] };
}
);
// db.names (no args) -> JSON only
registerToolNoSchema(
server,
"db.names",
{
title: "List database names",
description: "List database names (not aliases) visible in this session (unique, sorted).",
},
null,
async () => {
const visible = metaVisible();
const names = Array.from(
new Set(visible.map((m) => m.databaseName).filter(Boolean))
).sort((a, b) => a.localeCompare(b));
return { content: [{ type: "text", text: JSON.stringify(names) }] };
// return { content: [{ type: "json", json: names }] };
}
);
// db.listByType (args) -> JSON only
const LIST_BY_TYPE = {
type: z.string().min(1).describe("Dialect: mysql\npg\nmssql\noracle\nsqlite"),
unique: z.boolean().optional().default(true),
includeAliases: z.boolean().optional().default(false),
} satisfies z.ZodRawShape;
registerToolNoSchema(
server,
"db.listByType",
{
title: "List databases by type",
description:
"List database names for a given dialect. unique=true returns unique names; set unique=false for one row per alias; includeAliases=true to add alias.",
},
LIST_BY_TYPE,
async ({ type, unique, includeAliases }) => {
const dialect = String(type ?? "").trim();
if (!dialect) {
const err = { error: "Missing required 'type'." };
return { isError: true, content: [{ type: "json", json: err }] };
}
const allowed = serverAliases.get(server) ?? new Set<string>();
const visible = [...meta.entries()]
.filter(([alias]) => allowed.has(alias))
.map(([, m]) => m)
.filter((m) => m.dialect === dialect);
if (unique) {
const names = Array.from(
new Set(visible.map((i) => i.databaseName).filter(Boolean))
).sort((a, b) => a.localeCompare(b));
return { content: [{ type: "json", json: names }] };
}
const rows = visible
.map((i) => (includeAliases ? { alias: i.alias, name: i.databaseName } : { name: i.databaseName }))
.sort(
(a: any, b: any) =>
String(a.name).localeCompare(String(b.name)) +
(a.alias !== undefined && b.alias !== undefined
? String(a.alias).localeCompare(String(b.alias))
: 0)
);
return { content: [{ type: "json", json: rows }] };
}
);
}
}
async function audit(line: string) {
if (!auditPath) return;
const fs = await import("node:fs/promises");
await fs.appendFile(auditPath, line + "\n", "utf8");
}
/* ────────────────────────────────────────────────────────────────────────
Namespaced SQL tools
──────────────────────────────────────────────────────────────────────── */
// sql.schema (no args) -> Markdown only
if (tools?.schema !== false) {
registerToolNoSchema(
server,
name("sql.schema"),
{
title: "Describe schema",
description:
"Return a compact Markdown outline of tables and columns for the chosen database.",
},
null,
async () => {
const md = await describeSchema(db);
return { content: [{ type: "text", text: md }] };
}
);
}
// sql.peek (args) -> single-type output
if (tools?.peek !== false) {
const PEEK_SHAPE = {
maxRowsPerTable: z.number().int().min(1).max(10000).optional().default(50),
as: z.enum(["markdown", "json"]).optional().default("markdown"),
} satisfies z.ZodRawShape;
registerToolNoSchema(
server,
name("sql.peek"),
{
title: "Peek into database content",
description: [
"Return up to N rows from each base table in the chosen database.",
"Dialect-aware and read-only. Use this to quickly inspect unknown schemas.",
].join("\n"),
},
PEEK_SHAPE,
async ({ maxRowsPerTable, as }) => {
const tables = await listTables(db);
const safeTables = Array.from(
new Set(tables.filter((t): t is string => typeof t === "string" && t.length > 0))
);
if (!safeTables.length) {
return as === "json"
? { content: [{ type: "json", json: [] }] }
: { content: [{ type: "text", text: "_(no tables)_" }] };
}
const dump = await dumpTables(db, safeTables, maxRowsPerTable!);
if (as === "json") {
return { content: [{ type: "json", json: dump }] };
}
const md = dump
.map(({ table, rows }) => `## ${table}\n\n${toMarkdown(rows)}`)
.join("\n\n");
return { content: [{ type: "text", text: md }] };
}
);
}
// sql.query (args) -> single-type output
if (tools?.query !== false) {
const QUERY_SHAPE = {
sql: z.string(),
params: z.record(z.any()).optional().default({}),
readOnly: z.boolean().optional().default(true),
rowLimit: z.number().int().min(1).max(10000).optional().default(1000),
as: z.enum(["json", "markdown"]).optional().default("json"),
} satisfies z.ZodRawShape;
registerToolNoSchema(
server,
name("sql.query"),
{
title: "Execute SQL",
description: ["Execute a parameterized SQL query against the chosen database.", "", "**Usage Tips:**", sqlGuardrails()].join("\n"),
},
QUERY_SHAPE,
async ({ sql, params = {}, readOnly = true, rowLimit = 1000, as = "json" }) => {
// 1) readOnly (policy overrides user input)
const effectiveReadOnly = dataPolicy?.readOnly ?? readOnly;
if (effectiveReadOnly && !/^\s*select\b/i.test(sql)) {
throw new Error("readOnly mode: only SELECT is allowed.");
}
// NEW: Only block when a non-empty user_id is explicitly provided and differs
const userIdArgPresent =
params != null &&
Object.prototype.hasOwnProperty.call(params, "user_id") &&
params.user_id != null &&
String(params.user_id).trim() !== "";
if (dataPolicy?.rowFilters && userIdArgPresent) {
const arg = String(params.user_id).trim();
const sessionUid = String(userContext?.user_id ?? "").trim();
if (arg !== sessionUid) {
throw new Error("I'm sorry, you don't have permission to access this data.");
}
}
// 2) table allowlist + 3) row filters
let effectiveSql = sql;
let effectiveParams: Record<string, any> = { ...(params ?? {}) };
if ((dataPolicy?.tableAllow?.length || dataPolicy?.rowFilters)) {
const base = detectBaseTable(sql);
if (base) {
// const bare = base.replace(/^["'`\[\]]?/g, "").split(".").pop()!.toLowerCase();
const lastPart = base.split(".").pop()!;
const bare = lastPart.replace(/^[\[\]"'`]+|[\[\]"'`]+$/g, "").toLowerCase();
// table allowlist
if (dataPolicy?.tableAllow?.length) {
const ok = dataPolicy.tableAllow.map((t) => t.toLowerCase()).includes(bare);
// if (!ok) throw new Error(`Forbidden: table '${bare}' not allowed for this role.`);
if (!ok) throw new Error("I'm sorry, you don't have permission to access this table.");
}
// row filters
const filter = dataPolicy?.rowFilters?.[bare];
if (filter) {
if (/:user_id\b/.test(filter) && !userContext?.user_id) {
throw new Error("Missing user identity (user_id) for row-level policy.");
}
effectiveSql = addWhere(effectiveSql, filter);
if (userContext?.user_id !== undefined) {
effectiveParams = { ...effectiveParams, user_id: userContext.user_id };
}
}
}
}
// 4) execute
const { text, params: mapped } = mapNamedToDriver(effectiveSql, effectiveParams, db.dialect);
const t0 = Date.now();
const { rows, rowCount } = await db.query(text, mapped);
const ms = Date.now() - t0;
const limited = Array.isArray(rows) && rows.length > rowLimit ? rows.slice(0, rowLimit) : rows;
await audit(`[${new Date().toISOString()}] ${db.dialect} rows=${rowCount ?? limited?.length ?? 0} ms=${ms} sql=${effectiveSql}`);
if (as === "markdown") {
return { content: [{ type: "text", text: toMarkdown(limited) }] };
}
return { content: [{ type: "json", json: limited }] };
}
);
}
}
/* ────────────────────────────────────────────────────────────────────────────
Helper functions (unchanged except markdown table layout + :user_id fix)
──────────────────────────────────────────────────────────────────────────── */
function toMarkdown(rows: any[]) {
if (!rows?.length) return "_(no rows)_";
const headers = Object.keys(rows[0]);
const top = `${headers.join(" | ")}\n`;
const sep = `${headers.map(() => "---").join(" | ")}\n`;
const body = rows.map((r) => `${headers.map((h) => fmt(r[h])).join(" | ")}`).join("\n");
return [top, sep, body].join("");
}
function fmt(v: unknown) {
if (v === null || v === undefined) return "";
if (typeof v === "object") return "```json\n" + JSON.stringify(v) + "\n```";
return String(v);
}
function quoteIdent(dialect: DB["dialect"], ident: string) {
switch (dialect) {
case "pg":
case "oracle":
case "sqlite": {
const safe = ident.replace(/"/g, '""'); return `"${safe}"`;
}
case "mysql": {
const safe = ident.replace(/`/g, "``"); return `\`${safe}\``;
}
case "mssql": {
const safe = ident.replace(/]/g, "]]"); return `[${safe}]`;
}
}
}
function quoteMaybeQualified(dialect: DB["dialect"], ident: string) {
if (ident.includes(".")) {
const [schema, name] = ident.split(".");
return `${quoteIdent(dialect, schema)}.${quoteIdent(dialect, name)}`;
}
return quoteIdent(dialect, ident);
}
async function listTables(dbX: DB): Promise<string[]> {
switch (dbX.dialect) {
case "pg": {
const sql = `
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public' AND table_type = 'BASE TABLE'
ORDER BY table_name`;
const { rows } = await dbX.query<{ table_name: string }>(sql, []);
return rows.map((r) => r.table_name);
}
case "mysql": {
const sql = `
SELECT TABLE_NAME AS table_name
FROM information_schema.tables
WHERE table_schema = DATABASE() AND TABLE_TYPE = 'BASE TABLE'
ORDER BY TABLE_NAME`;
const { rows } = await dbX.query<{ table_name: string }>(sql, []);
return rows.map((r) => r.table_name);
}
case "mssql": {
const sql = `
SELECT TABLE_SCHEMA AS table_schema, TABLE_NAME AS table_name
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_TYPE = 'BASE TABLE'
ORDER BY TABLE_SCHEMA, TABLE_NAME`;
const { rows } = await dbX.query<{ table_schema: string; table_name: string }>(sql, []);
return rows.map((r) => r.table_name);
}
case "oracle": {
const quoted = excludedOracleTables.map((name) => `'${name.toUpperCase()}'`).join(", ");
const sql = `
SELECT table_name AS "table_name"
FROM user_tables
WHERE temporary = 'N'
AND table_name NOT LIKE 'ROLLING$%'
AND table_name NOT LIKE 'SCHEDULER_%'
${excludedOracleTables.length ? `AND table_name NOT IN (${quoted})` : ""}
AND table_name NOT IN (SELECT object_name FROM user_recyclebin)
ORDER BY table_name`;
const { rows } = await dbX.query<{ table_name: string }>(sql, []);
return rows.map((r) => r.table_name);
}
case "sqlite": {
const sql = `
SELECT name AS table_name
FROM sqlite_master
WHERE type='table' AND name NOT LIKE 'sqlite_%'
ORDER BY name`;
const { rows } = await dbX.query<{ table_name: string }>(sql, []);
return rows.map((r) => r.table_name);
}
}
}
async function dumpTables(dbX: DB, tables: string[], maxRows: number) {
const result: { table: string; rows: any[] }[] = [];
for (const t of tables) {
const qTable = quoteMaybeQualified(dbX.dialect, t);
let sql: string; let params: any;
switch (dbX.dialect) {
case "pg": { sql = `SELECT * FROM ${qTable} LIMIT $1`; params = [maxRows]; break; }
case "mysql":
case "sqlite": { sql = `SELECT * FROM ${qTable} LIMIT ?`; params = [maxRows]; break; }
case "mssql": { sql = `SELECT TOP (${maxRows}) * FROM ${qTable}`; params = []; break; }
case "oracle": { sql = `SELECT * FROM ${qTable} WHERE ROWNUM <= :n`; params = { n: maxRows }; break; }
}
const { rows } = await dbX.query<any>(sql, params);
result.push({ table: t, rows: Array.isArray(rows) ? rows.slice(0, maxRows) : [] });
}
return result;
}
async function describeViaQuery<T extends Record<string, any>>(
dbX: DB,
sql: string,
tableKey: string,
columnKey: string,
typeKey: string
): Promise<string> {
const { rows } = await dbX.query<T>(sql, []);
const m = new Map<string, string[]>();
for (const r of rows) {
const t = (r as any)[tableKey];
const c = (r as any)[columnKey];
const d = (r as any)[typeKey];
if (!t || !c) continue;
const list = m.get(t) ?? [];
list.push(`${c} ${d ?? ""}`.trim());
m.set(t, list);
}
return (
[...m.entries()]
.map(([t, cols]) => `### ${t}\n- ${cols.join("\n- ")}`)
.join("\n\n") || "_(no tables)_"
);
}
async function describeSchema(dbX: DB) {
const tables = await listTables(dbX);
const safeTables = Array.from(new Set(tables.filter((t): t is string => typeof t === "string" && t.length > 0)));
if (!safeTables.length) return "_(no tables)_";
switch (dbX.dialect) {
case "pg": {
const inList = safeTables.map((t) => `'${t}'`).join(", ");
const sql = `
SELECT table_name, column_name, data_type
FROM information_schema.columns
WHERE table_schema = 'public' AND table_name IN (${inList})
ORDER BY table_name, ordinal_position`;
return await describeViaQuery<Record<string, any>>(dbX, sql, "table_name", "column_name", "data_type");
}
case "mysql": {
const inList = safeTables.map((t) => `'${t}'`).join(", ");
const sql = `
SELECT TABLE_NAME AS table_name, COLUMN_NAME AS column_name, DATA_TYPE AS data_type
FROM information_schema.columns
WHERE table_schema = DATABASE() AND TABLE_NAME IN (${inList})
ORDER BY TABLE_NAME, ORDINAL_POSITION`;
return await describeViaQuery<Record<string, any>>(dbX, sql, "table_name", "column_name", "data_type");
}
case "mssql": {
const q = safeTables.map((t) => {
if (t.includes(".")) {
const [schema, name] = t.split(".");
return { schema: schema.replace(/'/g, "''"), name: name.replace(/'/g, "''") };
}
return { schema: null as string | null, name: t.replace(/'/g, "''") };
});
const hasSchema = q.some((x) => !!x.schema);
let sql: string;
if (hasSchema) {
const orConds = q
.map((x) =>
x.schema
? `(TABLE_SCHEMA = '${x.schema}' AND TABLE_NAME = '${x.name}')`
: `(TABLE_NAME = '${x.name}')`
)
.join(" OR ");
sql = `
SELECT CONCAT(TABLE_SCHEMA, '.', TABLE_NAME) AS table_name, COLUMN_NAME AS column_name, DATA_TYPE AS data_type
FROM INFORMATION_SCHEMA.COLUMNS
WHERE ${orConds}
ORDER BY TABLE_SCHEMA, TABLE_NAME, ORDINAL_POSITION`;
} else {
const inList = q.map((x) => `'${x.name}'`).join(", ");
sql = `
SELECT TABLE_NAME AS table_name, COLUMN_NAME AS column_name, DATA_TYPE AS data_type
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME IN (${inList})
ORDER BY TABLE_NAME, ORDINAL_POSITION`;
}
return await describeViaQuery<Record<string, any>>(dbX, sql, "table_name", "column_name", "data_type");
}
case "oracle": {
const inList = safeTables.map((t) => `'${t.toUpperCase()}'`).join(", ");
const sql = `
SELECT
table_name AS "table_name",
column_name AS "column_name",
CASE
WHEN data_type IN ('VARCHAR2','NVARCHAR2','CHAR','NCHAR') AND data_length IS NOT NULL
THEN data_type || '(' || data_length || ')'
WHEN data_type = 'NUMBER' AND data_precision IS NOT NULL
THEN data_type || '(' || data_precision || NVL2(data_scale, ',' || data_scale, '') || ')'
ELSE data_type
END AS "data_type"
FROM user_tab_columns
WHERE UPPER(table_name) IN (${inList})
ORDER BY table_name, column_id`;
return await describeViaQuery<Record<string, any>>(dbX, sql, "table_name", "column_name", "data_type");
}
case "sqlite": {
const parts: string[] = [];
for (const t of safeTables) {
const pragma = `PRAGMA table_info(${quoteIdent(dbX.dialect, t)});`;
const { rows } = await dbX.query<{ name: string; type: string }>(pragma, []);
if (!rows?.length) continue;
const body = rows.map((r) => `- ${r.name} \`${r.type}\``).join("\n");
parts.push(`## ${t}\n\n${body}`);
}
return parts.join("\n\n") || "_(no tables)_";
}
}
}
function detectBaseTable(sql: string): string | null {
const m = sql.replace(/\s+/g, " ").match(/\bfrom\s+([A-Za-z0-9_."`\[\]]+)/i);
return m?.[1] ?? null;
}
function addWhere(sql: string, filter: string): string {
const idxOrder = sql.search(/\border\s+by\b/i);
const idxLimit = sql.search(/\blimit\b/i);
const idxOffset = sql.search(/\boffset\b/i);
const idxFetch = sql.search(/\bfetch\b/i);
const cut = [idxOrder, idxLimit, idxOffset, idxFetch].filter((i) => i >= 0).sort((a, b) => a - b)[0] ?? sql.length;
const head = sql.slice(0, cut);
const tail = sql.slice(cut);
if (/\bwhere\b/i.test(head)) return head + " AND (" + filter + ") " + tail;
return head + " WHERE " + filter + " " + tail;
}
```
src/tools/sql/templates.ts
```
export function sqlGuardrails(): string {
return [
"1. Use a single SELECT statement.",
"2. Always use :name placeholders (e.g., :from, :limit).",
"3. Avoid INSERT, UPDATE, DELETE unless explicitly allowed.",
"4. Use exact table/column names (call `sql.schema` first if unsure).",
"5. Add LIMIT/TOP/ROWNUM to keep results small.",
"6. Prefer ANSI SQL over vendor-specific syntax.",
].join("\n");
}
```
src/tools/sql/unwantedOracle.ts
```
// The creation of Oracle Database will create other tables that are not
// directly related to the user tables. This script will drop those unwanted
// tables.
export const excludedOracleTables = [
"AQ$_INTERNET_AGENTS",
"AQ$_INTERNET_AGENT_PRIVS",
"AQ$_KEY_SHARD_MAP",
"AQ$_QUEUES",
"AQ$_QUEUE_TABLES",
"AQ$_SCHEDULES",
"HELP",
"LOGMNRC_CONCOL_GG",
"LOGMNRC_CON_GG",
"LOGMNRC_DBNAME_UID_MAP",
"LOGMNRC_GSBA",
"ORDERS",
"MVIEW_WORKLOAD",
"MVIEW_RECOMMENDATIONS",
"MVIEW_LOG",
"MVIEW_FILTERINSTANCE",
"MVIEW_FILTER",
"MVIEW_EXCEPTIONS",
"MVIEW_EVALUATIONS",
"LOGMNR_SHARD_TS",
"LOGMNRC_GSII",
"LOGMNRC_GTCS",
"LOGMNRC_GTLO",
"LOGMNRC_INDCOL_GG",
"LOGMNRC_IND_GG",
"LOGMNRC_SEQ_GG",
"LOGMNRC_SHARD_TS",
"LOGMNRC_TS",
"LOGMNRC_TSPART",
"LOGMNRC_USER",
"LOGMNRGGC_GTCS",
"LOGMNRGGC_GTLO",
"LOGMNRP_CTAS_PART_MAP",
"LOGMNRT_MDDL$",
"LOGMNR_AGE_SPILL$",
"LOGMNR_ATTRCOL$",
"LOGMNR_ATTRIBUTE$",
"LOGMNR_CCOL$",
"LOGMNR_CDEF$",
"LOGMNR_COL$",
"LOGMNR_COLTYPE$",
"LOGMNR_CON$",
"LOGMNR_CONTAINER$",
"LOGMNR_DICTIONARY$",
"LOGMNR_DICTSTATE$",
"LOGMNR_DID$",
"LOGMNR_ENC$",
"LOGMNR_ERROR$",
"LOGMNR_FILTER$",
"LOGMNR_GLOBAL$",
"LOGMNR_GT_TAB_INCLUDE$",
"LOGMNR_GT_USER_INCLUDE$",
"LOGMNR_GT_XID_INCLUDE$",
"LOGMNR_ICOL$",
"LOGMNR_IDNSEQ$",
"LOGMNR_IND$",
"LOGMNR_INDCOMPART$",
"LOGMNR_INDPART$",
"LOGMNR_INDSUBPART$",
"LOGMNR_KOPM$",
"LOGMNR_LOB$",
"LOGMNR_LOBFRAG$",
"LOGMNR_LOG$",
"LOGMNR_LOGMNR_BUILDLOG",
"LOGMNR_NTAB$",
"LOGMNR_OBJ$",
"LOGMNR_OPQTYPE$",
"LOGMNR_PARAMETER$",
"LOGMNR_PARTOBJ$",
"LOGMNR_PDB_INFO$",
"LOGMNR_PROCESSED_LOG$",
"LOGMNR_PROFILE_PLSQL_STATS$",
"LOGMNR_PROFILE_TABLE_STATS$",
"LOGMNR_PROPS$",
"LOGMNR_REFCON$",
"LOGMNR_RESTART_CKPT$",
"LOGMNR_RESTART_CKPT_TXINFO$",
"LOGMNR_SEED$",
"LOGMNR_SESSION$",
"LOGMNR_SESSION_ACTIONS$",
"LOGMNR_SESSION_EVOLVE$",
"LOGMNR_SPILL$",
"LOGMNR_SUBCOLTYPE$",
"LOGMNR_TAB$",
"LOGMNR_TABCOMPART$",
"LOGMNR_TABPART$",
"LOGMNR_TABSUBPART$",
"LOGMNR_TS$",
"LOGMNR_TYPE$",
"LOGMNR_UID$",
"LOGMNR_USER$",
"LOGSTDBY$APPLY_MILESTONE",
"LOGSTDBY$APPLY_PROGRESS",
"LOGSTDBY$EDS_TABLES",
"LOGSTDBY$EVENTS",
"LOGSTDBY$FLASHBACK_SCN",
"LOGSTDBY$HISTORY",
"LOGSTDBY$PARAMETERS",
"LOGSTDBY$PLSQL",
"LOGSTDBY$SCN",
"LOGSTDBY$SKIP",
"LOGSTDBY$SKIP_SUPPORT",
"LOGSTDBY$SKIP_TRANSACTION",
"MVIEW$_ADV_AJG",
"MVIEW$_ADV_BASETABLE",
"MVIEW$_ADV_CLIQUE",
"MVIEW$_ADV_ELIGIBLE",
"MVIEW$_ADV_EXCEPTIONS",
"MVIEW$_ADV_FILTER",
"MVIEW$_ADV_FILTERINSTANCE",
"MVIEW$_ADV_FJG",
"MVIEW$_ADV_GC",
"MVIEW$_ADV_INFO",
"MVIEW$_ADV_JOURNAL",
"MVIEW$_ADV_LEVEL",
"MVIEW$_ADV_LOG",
"MVIEW$_ADV_OUTPUT",
"MVIEW$_ADV_PARAMETERS",
"MVIEW$_ADV_PLAN",
"MVIEW$_ADV_PRETTY",
"MVIEW$_ADV_ROLLUP",
"MVIEW$_ADV_SQLDEPEND",
"MVIEW$_ADV_TEMP",
"MVIEW$_ADV_WORKLOAD",
"OL$",
"OL$HINTS",
"OL$NODES",
"PRODUCT_PRIVS",
"REDO_DB",
"REDO_LOG",
"REPL_SUPPORT_MATRIX",
"REPL_VALID_COMPAT",
"ROLLING$CONNECTIONS",
"ROLLING$DATABASES",
"ROLLING$DIRECTIVES",
"ROLLING$EVENTS",
"ROLLING$PARAMETERS",
"ROLLING$PLAN",
"ROLLING$STATISTICS",
"ROLLING$STATUS",
"SCHEDULER_JOB_ARGS_TBL",
"SCHEDULER_PROGRAM_ARGS_TBL",
"SQLPLUS_PRODUCT_PROFILE"
];
```
src/policy/index.ts:
```
// src/policy/index.ts
import fs from "node:fs";
import * as yaml from "js-yaml";
export type PolicyFile = {
roleBindings?: Record<string, { allow?: { aliases?: string[] } }>;
};
let cached: { mtimeMs: number; path: string; policy: PolicyFile } | null = null;
function loadYaml(path: string): PolicyFile {
const stat = fs.statSync(path);
if (cached && cached.path === path && cached.mtimeMs === stat.mtimeMs) {
return cached.policy;
}
const raw = fs.readFileSync(path, "utf8");
const obj = yaml.load(raw) as PolicyFile;
cached = { mtimeMs: stat.mtimeMs, path, policy: obj };
return obj;
}
export type EvalInput = {
roles: string[]; // e.g., ['customer'] or ['merchant_admin']
allAliases: string[]; // Array.from(registry.keys())
};
export type EvalOutput = {
allowedAliases: string[];
};
export function evaluatePolicyFromFile(path: string, input: EvalInput): EvalOutput {
const doc = loadYaml(path);
const rb = doc.roleBindings ?? {};
const out = new Set<string>();
for (const role of input.roles) {
const allow = rb[role]?.allow?.aliases ?? [];
if (allow.includes("*")) {
input.allAliases.forEach(a => out.add(a));
continue;
}
allow.forEach(a => out.add(a));
}
// Only keep aliases that exist on this server
const allowed = [...out].filter(a => input.allAliases.includes(a)).sort();
return { allowedAliases: allowed };
}
// -----------------------------------------------------------------------------
// Optional: tool-level policy resolution (used by http.ts when present)
// -----------------------------------------------------------------------------
// src/policy/index.ts
export type ToolsAllowed = { schema: boolean; peek: boolean; query: boolean };
export type ToolsPolicyResult = {
tools: ToolsAllowed;
readOnly?: boolean;
tableAllow?: string[];
rowFilters?: Record<string, string>;
};
export function evaluateToolsPolicyFromFile(
path: string,
input: { roles: string[]; aliases: string[] }
): Record<string, ToolsPolicyResult> {
const doc: any = loadYaml(path) || {};
const tp = doc.toolPolicies ?? {};
const out: Record<string, ToolsPolicyResult> = {};
for (const alias of input.aliases) {
const spec = tp[alias];
if (!spec) continue;
// Start from default (if present)
const dList = Array.isArray(spec.default?.tools) ? (spec.default.tools as string[]) : undefined;
let result: ToolsPolicyResult = {
tools: dList
? { schema: dList.includes("sql.schema"), peek: dList.includes("sql.peek"), query: dList.includes("sql.query") }
: { schema: true, peek: true, query: true },
readOnly: spec.default?.readOnly,
tableAllow: spec.default?.tableAllow,
rowFilters: spec.default?.rowFilters,
};
// Apply byRole overrides (last matching role wins)
const byRole = spec.byRole ?? {};
for (const r of input.roles) {
const rs = byRole[r];
if (!rs) continue;
if (Array.isArray(rs.tools)) {
result.tools = {
schema: rs.tools.includes("sql.schema"),
peek: rs.tools.includes("sql.peek"),
query: rs.tools.includes("sql.query"),
};
}
if (typeof rs.readOnly === "boolean") result.readOnly = rs.readOnly;
if (Array.isArray(rs.tableAllow)) result.tableAllow = rs.tableAllow;
if (rs.rowFilters && typeof rs.rowFilters === "object") result.rowFilters = rs.rowFilters;
}
out[alias] = result;
}
return out;
}
```
policies.yaml (root folder):
```
# Which aliases each role may access
# (Use comma-separated roles in dev header X-Role to simulate multiple)
# Role Bindings - Define by roles and which database it can access
roleBindings:
# Admin can access everything
admin:
allow:
aliases: ["*"]
# Customer policies - Can access customer db only
customer:
allow:
aliases: [customer_db, merchant_db]
customer_admin:
allow:
aliases: [customer_db]
# Merchant policies
merchant:
allow:
aliases: [merchant_db]
merchant_admin:
allow:
aliases: [merchant_db]
# Tool Policies - Which tools a role can use per alias (optional)
toolPolicies:
customer_db:
default:
tools: [sql.schema, sql.peek, sql.query]
byRole:
customer:
# tools: [sql.query]
tools: [sql.schema, sql.query] # allow schema for
readOnly: true
tableAllow: ["users","purchase_history","points_history"]
rowFilters:
users: "user_id = :user_id"
purchase_history: "user_id = :user_id"
points_history: "user_id = :user_id"
allow:
aliases: [customer_db]
merchant_db:
default:
tools: [sql.schema, sql.peek, sql.query]
byRole:
merchant:
# tools: [sql.query]
tools: [sql.schema, sql.query] # same idea
readOnly: true
tableAllow: ["merchants", "items", "purchase_history"]
rowFilters:
items: "merchant_id = :user_id"
allow:
aliases: [customer_db, merchant_db]
customer:
tools: [sql.schema, sql.query]
readOnly: true
tableAllow: ["items"]
```
Postman (All commented but I just uncomment each section when I wanna use):
```
// Initialize Handshake
// {
// "jsonrpc": "2.0",
// "id": "2",
// "method": "initialize",
// "params": {
// "protocolVersion": "2025-03-26",
// "clientInfo": { "name": "postman", "version": "1.0.0" },
// "capabilities": { "roots": { "listChanged": true }, "sampling": {}, "tools": {} }
// }
// }
// Tell the server the client is ready for normal operations - Take the mcp-session id under the response header in the Postman
// {
// "jsonrpc": "2.0",
// "method": "notifications/initialized"
// }
// List Available tools
// {
// "jsonrpc": "2.0",
// "id": "2",
// "method": "tools/list",
// "params": {}
// }
// -----------------------------------------------------------------------------------------------------------
// List of DB Types Available
// {
// "jsonrpc": "2.0",
// "id": "3",
// "method": "tools/call",
// "params": {
// "name": "db.types",
// "arguments": {
// }
// }
// }
// List of DB Names List
// {
// "jsonrpc": "2.0",
// "id": "3",
// "method": "tools/call",
// "params": {
// "name": "db.names",
// "arguments": {}
// }
// }
// // List of DB Aliases list
// {
// "jsonrpc": "2.0",
// "id": "10",
// "method": "tools/call",
// "params": {
// "name": "db.aliases",
// "arguments": {}
// }
// }
// List of DB available based on type:
// {
// "jsonrpc": "2.0",
// "id": "3",
// "method": "tools/call",
// "params": {
// "name": "db.listByType",
// "arguments": { "type": "mssql" }
// }
// }
// -----------------------------------------------------------------------------------------------------------
// Call peek the database
// {
// "jsonrpc": "2.0",
// "id": "7",
// "method": "tools/call",
// "params": {
// "name": "mssql_2.sql.peek",
// "arguments": {
// "maxRowsPerTable": 5,
// "as": "markdown"
// }
// }
// }
// Call sql tool - list of db existed
// {
// "jsonrpc": "2.0",
// "id": "3",
// "method": "tools/call",
// "params": {
// "name": "db.list",
// "arguments": {}
// }
// }
// Call sql tool - list of db existed
// {
// "jsonrpc": "2.0",
// "id": "3",
// "method": "tools/call",
// "params": {
// "name": "db.list",
// "arguments": {}
// }
// }
// Call sql tool - sql query, peek, schema
// {
// "jsonrpc": "2.0",
// "id": "3",
// "method": "tools/call",
// "params": {
// // postgres
// // "name": "cinema_users.sql.query",
// // "arguments": {
// // "db": "cinema_users",
// // "sql": "SELECT id, name, theater_id, points FROM members LIMIT 50"
// // }
// // // mysql
// "name": "customer_db.sql.query",
// "arguments": {
// "db": "customer_db",
// "sql": "SELECT purchase_id, user_id, item_id, total_price FROM purchase_history ORDER BY purchase_id"
// }
// // // mssql
// // "name": "mssql.sql.query",
// // "arguments": {
// // "db": "mssql",
// // "sql": "SELECT TOP 10 * FROM Doctors;"
// // }
// // // oracle
// // "name": "oracle.sql.query",
// // "arguments": {
// // "db": "oracle",
// // "sql": "SELECT * FROM COURSES"
// // }
// }
// }
```